You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by sachingoel0101 <gi...@git.apache.org> on 2015/08/01 18:02:45 UTC

[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

GitHub user sachingoel0101 opened a pull request:

    https://github.com/apache/flink/pull/970

    [FLINK-2458][FLINK-2449]Access distributed cache entries for CollectionExecution and in Iterative tasks.

    1. This PR adds support for accessing distributed cache entries when running iterations.
    2. Since there are several tests which execute on both Cluster and Collection modes, it seems logical to not fail a test on either if it passes on both. Distributed Cache files create one such case. There is nothing actually wrong with trying to access a distributed cache entry when running in collection environment. It just doesn't really make sense to do so.
    This takes care of that too.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sachingoel0101/flink iteration_cache_files

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/970.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #970
    
----
commit c5b33898d1727cd196e7836becc2a30266641eec
Author: Sachin Goel <sa...@gmail.com>
Date:   2015-08-01T13:55:42Z

    [FLINK-2458][FLINK-2449]Access distributed cache entries for
    CollectionExecution and in Iterative tasks.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/970#discussion_r37144275
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -897,7 +897,7 @@ class TaskManager(
             config.timeout,
             libCache,
             fileCache,
    -        runtimeInfo)
    +        new TaskRuntimeInfo(hostname, taskManagerConfig, tdd.getAttemptNumber))
    --- End diff --
    
    I would prefer if you opened a second PR once this is merged. The issues are not really related to each other; the 2nd commit was simply made based on the 1st commit. We would end up having two separate discussions in 1 PR, which i think is a bad idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

Posted by sachingoel0101 <gi...@git.apache.org>.
Github user sachingoel0101 commented on the pull request:

    https://github.com/apache/flink/pull/970#issuecomment-131519550
  
    Reverting back to make this PR only about the distributed cache.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/970#discussion_r36512933
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java ---
    @@ -501,4 +536,22 @@ public int getSuperstepNumber() {
     			return (T) previousAggregates.get(name);
     		}
     	}
    +
    +	private static final class DoingNothing implements Callable<Path>{
    +		private Path entry;
    +
    +		public DoingNothing(Path entry){
    +			this.entry = entry;
    +		}
    +
    +		@Override
    +		public Path call() throws IOException{
    +			try{
    +				LocalFileSystem fs = (LocalFileSystem) entry.getFileSystem();
    +				return entry.isAbsolute() ? new Path(entry.toUri().getPath()): new Path(fs.getWorkingDirectory(),entry);
    +			} catch (ClassCastException e){
    +				throw new RuntimeException("Collection execution must have only local file paths");
    --- End diff --
    
    dislike this error message, there's is no apparent relation to the distributed cache. 
    how about "The DistrbutedCache only supports local files for Collection Environments."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/970#issuecomment-131570789
  
    Aside from the comment above, this looks good. Would merge this, after the comment is addressed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/970#issuecomment-131570435
  
    In the `CollectionExecutor`, can you skip creating the `ExecutiorService`? You can eagerly resolve the path and then put an already finished future into the map.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

Posted by sachingoel0101 <gi...@git.apache.org>.
Github user sachingoel0101 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/970#discussion_r37144300
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -897,7 +897,7 @@ class TaskManager(
             config.timeout,
             libCache,
             fileCache,
    -        runtimeInfo)
    +        new TaskRuntimeInfo(hostname, taskManagerConfig, tdd.getAttemptNumber))
    --- End diff --
    
    Ah. Yes. That makes sense. I will revert this and open a separate PR. Apologies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

Posted by sachingoel0101 <gi...@git.apache.org>.
Github user sachingoel0101 commented on the pull request:

    https://github.com/apache/flink/pull/970#issuecomment-129648092
  
    I've moved the test to an existing MultipleProgramTestBase`. Should be good to merge now. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/970#issuecomment-131570065
  
    We are indeed falling behind on merging pull requests, right now. Many committers are on vacation this month, and for the others, the large amount of pull requests is hard to keep up with, especially next to the work on our own issues.
    
    Hope this will get better in a week or two.
    
    I'll try to get a look at this very soon...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

Posted by hsaputra <gi...@git.apache.org>.
Github user hsaputra commented on a diff in the pull request:

    https://github.com/apache/flink/pull/970#discussion_r37143896
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -897,7 +897,7 @@ class TaskManager(
             config.timeout,
             libCache,
             fileCache,
    -        runtimeInfo)
    +        new TaskRuntimeInfo(hostname, taskManagerConfig, tdd.getAttemptNumber))
    --- End diff --
    
    Why is this changed from before?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

Posted by sachingoel0101 <gi...@git.apache.org>.
Github user sachingoel0101 commented on the pull request:

    https://github.com/apache/flink/pull/970#issuecomment-131504643
  
    I decided to go ahead and implement things which touch the Runtime Context constructors with this PR. This now closes five Jiras, namely 2449, 2458, 2488, 2496 and 2524. Commit messages are descriptive of each Jira.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

Posted by sachingoel0101 <gi...@git.apache.org>.
Github user sachingoel0101 commented on the pull request:

    https://github.com/apache/flink/pull/970#issuecomment-131575300
  
    Addressed comments. @StephanEwen 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/970#issuecomment-131579342
  
    Looks good, merging this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

Posted by sachingoel0101 <gi...@git.apache.org>.
Github user sachingoel0101 commented on the pull request:

    https://github.com/apache/flink/pull/970#issuecomment-128732528
  
    Addressed PR comments. There is one unrelated failure on the GroupReduceITCase. I've filed a JIRA for that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

Posted by sachingoel0101 <gi...@git.apache.org>.
Github user sachingoel0101 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/970#discussion_r36513272
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java ---
    @@ -501,4 +536,22 @@ public int getSuperstepNumber() {
     			return (T) previousAggregates.get(name);
     		}
     	}
    +
    +	private static final class DoingNothing implements Callable<Path>{
    +		private Path entry;
    +
    +		public DoingNothing(Path entry){
    +			this.entry = entry;
    +		}
    +
    +		@Override
    +		public Path call() throws IOException{
    +			try{
    +				LocalFileSystem fs = (LocalFileSystem) entry.getFileSystem();
    +				return entry.isAbsolute() ? new Path(entry.toUri().getPath()): new Path(fs.getWorkingDirectory(),entry);
    +			} catch (ClassCastException e){
    +				throw new RuntimeException("Collection execution must have only local file paths");
    --- End diff --
    
    Yeah. That would make more sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

Posted by sachingoel0101 <gi...@git.apache.org>.
Github user sachingoel0101 commented on the pull request:

    https://github.com/apache/flink/pull/970#issuecomment-131496338
  
    I'd like to get this merged soon. This removes multiple constructors for Runtime contexts and establishes a clean hierarchy, making any changes to the constructors easier. This will be useful for two Jiras on exposing task configuration and task attempt number to the Runtime context.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/970#discussion_r36512101
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java ---
    @@ -79,7 +68,7 @@ public AbstractRuntimeUDFContext(String name,
     		this.subtaskIndex = subtaskIndex;
     		this.userCodeClassLoader = userCodeClassLoader;
     		this.executionConfig = executionConfig;
    -		this.distributedCache = new DistributedCache(cpTasks);
    +		this.distributedCache = Preconditions.checkNotNull(new DistributedCache(cpTasks));
    --- End diff --
    
    Don't you want to check cpTasks for being null?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/970#discussion_r36640260
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java ---
    @@ -501,4 +536,22 @@ public int getSuperstepNumber() {
     			return (T) previousAggregates.get(name);
     		}
     	}
    +
    +	private static final class DoingNothing implements Callable<Path>{
    --- End diff --
    
    It actually does something ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

Posted by sachingoel0101 <gi...@git.apache.org>.
Github user sachingoel0101 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/970#discussion_r37144194
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -897,7 +897,7 @@ class TaskManager(
             config.timeout,
             libCache,
             fileCache,
    -        runtimeInfo)
    +        new TaskRuntimeInfo(hostname, taskManagerConfig, tdd.getAttemptNumber))
    --- End diff --
    
    Yes. The addition of distributed cache removes the need for multiple constructors for `RuntimeContext`s. Since providing access to runtime information needed changing the constructors, I deemed it better to work with what would be the only needed constructors after merging this. 
    I can revert this commit and open a separate PR for the *other* three issues if necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/970#issuecomment-129481422
  
    Looks good, in general.
    
    Can you add the test to one of the other iteration test files? This saves cluster startup and shutdown costs, making builds faster. Maybe to the iteration aggregators, or iteration accumulators.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

Posted by sachingoel0101 <gi...@git.apache.org>.
Github user sachingoel0101 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/970#discussion_r36680981
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java ---
    @@ -501,4 +536,22 @@ public int getSuperstepNumber() {
     			return (T) previousAggregates.get(name);
     		}
     	}
    +
    +	private static final class DoingNothing implements Callable<Path>{
    --- End diff --
    
    Haha. Yes. In an earlier version of the code, it wasn't. :')


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/970#discussion_r37144152
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -897,7 +897,7 @@ class TaskManager(
             config.timeout,
             libCache,
             fileCache,
    -        runtimeInfo)
    +        new TaskRuntimeInfo(hostname, taskManagerConfig, tdd.getAttemptNumber))
    --- End diff --
    
    generally we try to keep one PR for one issue, exceptions should only be done for closely related issues.
    
    why did you decide to add these issues into this PR? ( i have a hard time understanding it, since the commits barely touch the same files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/970#discussion_r36512186
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java ---
    @@ -37,18 +37,17 @@
     	private final HashMap<String, Object> initializedBroadcastVars = new HashMap<String, Object>();
     	
     	private final HashMap<String, List<?>> uninitializedBroadcastVars = new HashMap<String, List<?>>();
    -	
    -	
    +
     	public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader,
     							ExecutionConfig executionConfig, Map<String, Accumulator<?,?>> accumulators) {
    -		super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators);
    +		this(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig,
    +				new HashMap<String, Future<Path>>(), accumulators);
     	}
    -	
    +
     	public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader,
     							ExecutionConfig executionConfig, Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> accumulators) {
     		super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators, cpTasks);
     	}
    -	
    --- End diff --
    
    Here we have a few unnecessary formatting changes that just clutter the diff.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/970


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

Posted by sachingoel0101 <gi...@git.apache.org>.
Github user sachingoel0101 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/970#discussion_r36513206
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java ---
    @@ -79,7 +68,7 @@ public AbstractRuntimeUDFContext(String name,
     		this.subtaskIndex = subtaskIndex;
     		this.userCodeClassLoader = userCodeClassLoader;
     		this.executionConfig = executionConfig;
    -		this.distributedCache = new DistributedCache(cpTasks);
    +		this.distributedCache = Preconditions.checkNotNull(new DistributedCache(cpTasks));
    --- End diff --
    
    Ah yes. Sorry. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

Posted by sachingoel0101 <gi...@git.apache.org>.
Github user sachingoel0101 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/970#discussion_r37143936
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -897,7 +897,7 @@ class TaskManager(
             config.timeout,
             libCache,
             fileCache,
    -        runtimeInfo)
    +        new TaskRuntimeInfo(hostname, taskManagerConfig, tdd.getAttemptNumber))
    --- End diff --
    
    This is to provide access to Task attempt number from Runtime Context. I should add a description of the other tickets this resolves.
    Is this a good idea though? To fix five issues in one PR? Or should I open a separate one and keep this one for just distributed cache?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---