You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by kl0u <gi...@git.apache.org> on 2016/01/19 22:36:27 UTC

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

GitHub user kl0u opened a pull request:

    https://github.com/apache/flink/pull/1524

    FLINK-2380: allow to specify the default filesystem scheme in the flink configuration file.

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kl0u/flink fs_param

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1524.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1524
    
----
commit ef6431f586f983c2b0ba9318cc4046c3b348a742
Author: Kostas Kloudas <kk...@gmail.com>
Date:   2016-01-19T15:42:33Z

    FLINK-2380: allow the specification of a default filesystem scheme in the flink configuration file.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1524#discussion_r50253291
  
    --- Diff: docs/setup/config.md ---
    @@ -52,6 +52,14 @@ The configuration files for the TaskManagers can be different, Flink does not as
     - `parallelism.default`: The default parallelism to use for programs that have no parallelism specified. (DEFAULT: 1). For setups that have no concurrent jobs running, setting this value to NumTaskManagers * NumSlotsPerTaskManager will cause the system to use all available execution resources for the program's execution. **Note**: The default parallelism can be overwriten for an entire job by calling `setParallelism(int parallelism)` on the `ExecutionEnvironment` or by passing `-p <parallelism>` to the Flink Command-line frontend. It can be overwritten for single transformations by calling `setParallelism(int
     parallelism)` on an operator. See the [programming guide]({{site.baseurl}}/apis/programming_guide.html#parallel-execution) for more information about the parallelism.
     
    +- `fs.default-scheme`: The default filesystem scheme to be used, with the necessary authority (if needed). 
    +By default, this is set to `file:///` which points to the local filesystem. This means that the local 
    --- End diff --
    
    Actually it must be three. The authority in the case of the local filesystem is empty, this is denoted by having nothing between the two first and the third slashes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the pull request:

    https://github.com/apache/flink/pull/1524#issuecomment-184703065
  
    Perfect! Thanks a lot @rmetzger 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1524#discussion_r50252252
  
    --- Diff: docs/setup/config.md ---
    @@ -52,6 +52,14 @@ The configuration files for the TaskManagers can be different, Flink does not as
     - `parallelism.default`: The default parallelism to use for programs that have no parallelism specified. (DEFAULT: 1). For setups that have no concurrent jobs running, setting this value to NumTaskManagers * NumSlotsPerTaskManager will cause the system to use all available execution resources for the program's execution. **Note**: The default parallelism can be overwriten for an entire job by calling `setParallelism(int parallelism)` on the `ExecutionEnvironment` or by passing `-p <parallelism>` to the Flink Command-line frontend. It can be overwritten for single transformations by calling `setParallelism(int
     parallelism)` on an operator. See the [programming guide]({{site.baseurl}}/apis/programming_guide.html#parallel-execution) for more information about the parallelism.
     
    +- `fs.default-scheme`: The default filesystem scheme to be used, with the necessary authority (if needed). 
    +By default, this is set to `file:///` which points to the local filesystem. This means that the local 
    +filesystem is going to be used to search for user-specified files **without** an explicit scheme 
    +definition. As another example, if this is set to `hdfs://localhost:9000/`, then a user-specified file path 
    +without explicit scheme definition, such as `/user/USERNAME/in.txt`, is going to be transformed into 
    +`hdfs://localhost:9000/user/USERNAME/in.txt`, and a HDFS deployment is going to be used with its NameNode 
    --- End diff --
    
    I would remove the last part after the comma: `, and a HDFS...port 9000`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/1524#issuecomment-173212170
  
    Hey Klou! Nice addition. A couple of users have asked for this. I guess they will be happy. I made some minor comments inline. The changes look good, I haven't tested it yet manually, but I've noticed that there are no test cases. I think we should add a simple test that ensures the basic expected behaviour (use default scheme is file, use default scheme if specified, don't use default scheme if explicit in URI etc.) After adding the tests and trying it out manually, I think it will be good to merge. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1524#discussion_r53014739
  
    --- Diff: flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala ---
    @@ -176,6 +184,8 @@ abstract class ApplicationMasterBase {
             jobManagerPort, webServerPort, slots, taskManagerCount,
             dynamicPropertiesEncodedString)
     
    +      //todo should I also set the FS default here????
    --- End diff --
    
    @rmetzger Yes I know. That comment was forgotten since earlier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1524#discussion_r50252906
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java ---
    @@ -146,6 +147,15 @@ public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {
     
     		flinkYarnClient.setConfigurationFilePath(confPath);
     
    +		try {
    +			FileSystem.setDefaultScheme(flinkConfiguration);
    +		} catch (IOException e) {
    +			LOG.error("Error while setting the default " +
    --- End diff --
    
    In the other CLI you rethrow the exception. I would also throw it here. In any case best to have it consistent imo.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1524#issuecomment-182054657
  
    I'm testing the change on a cluster (with YARN) to see if everything is working as expected.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1524#issuecomment-182066864
  
    Setting the value to `fs.default-scheme: thisIsWrong:///`
    
    is good:
    
    ```
    robert@cdh544-master:~/flink/build-target$ ./bin/flink run ./examples/batch/WordCount.jar /user/robert/tpch-100lineitems.csv /user/robert/elasdoijwef
    
    ------------------------------------------------------------
     The program finished with the following exception:
    
    org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Cannot initialize task 'DataSink (CsvOutputFormat (path: /user/robert/elasdoijwef, delimiter:  ))': No file system found with scheme thisIsWrong, referenced in file URI 'thisIsWrong:/user/robert/elasdoijwef'.
    	at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
    	at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
    	at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
    	at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
    	at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:78)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
    	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
    	at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
    	at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:804)
    	at org.apache.flink.client.CliFrontend.run(CliFrontend.java:331)
    	at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1127)
    	at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1175)
    Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (CsvOutputFormat (path: /user/robert/elasdoijwef, delimiter:  ))': No file system found with scheme thisIsWrong, referenced in file URI 'thisIsWrong:/user/robert/elasdoijwef'.
    	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:981)
    	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:965)
    	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:965)
    	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:378)
    	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    	at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
    	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    	at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:105)
    	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    Caused by: java.io.IOException: No file system found with scheme thisIsWrong, referenced in file URI 'thisIsWrong:/user/robert/elasdoijwef'.
    	at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:290)
    	at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
    	at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:275)
    	at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84)
    	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:977)
    	... 29 more
    ```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1524#discussion_r50253229
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java ---
    @@ -159,56 +164,91 @@ public int hashCode() {
     	/**
     	 * Returns a reference to the {@link FileSystem} instance for accessing the
     	 * local file system.
    -	 * 
    +	 *
     	 * @return a reference to the {@link FileSystem} instance for accessing the
    -	 *         local file system.
    +	 * local file system.
     	 */
     	public static FileSystem getLocalFileSystem() {
     		// this should really never fail.
     		try {
     			URI localUri = OperatingSystem.isWindows() ? new URI("file:/") : new URI("file:///");
     			return get(localUri);
    -		}
    -		catch (Exception e) {
    +		} catch (Exception e) {
     			throw new RuntimeException("Cannot create URI for local file system");
     		}
     	}
     
     	/**
    +	 * The default filesystem scheme to be used. This can be specified by the parameter
    +	 * <code>fs.default-scheme</code> in <code>flink-conf.yaml</code>. By default this is
    +	 * set to <code>file:///</code> and uses the local filesystem.
    +	 * */
    --- End diff --
    
    It should be `*/` at the end


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1524#discussion_r50252158
  
    --- Diff: docs/setup/config.md ---
    @@ -52,6 +52,14 @@ The configuration files for the TaskManagers can be different, Flink does not as
     - `parallelism.default`: The default parallelism to use for programs that have no parallelism specified. (DEFAULT: 1). For setups that have no concurrent jobs running, setting this value to NumTaskManagers * NumSlotsPerTaskManager will cause the system to use all available execution resources for the program's execution. **Note**: The default parallelism can be overwriten for an entire job by calling `setParallelism(int parallelism)` on the `ExecutionEnvironment` or by passing `-p <parallelism>` to the Flink Command-line frontend. It can be overwritten for single transformations by calling `setParallelism(int
     parallelism)` on an operator. See the [programming guide]({{site.baseurl}}/apis/programming_guide.html#parallel-execution) for more information about the parallelism.
     
    +- `fs.default-scheme`: The default filesystem scheme to be used, with the necessary authority (if needed). 
    --- End diff --
    
    What do you mean with necessary authority?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the pull request:

    https://github.com/apache/flink/pull/1524#issuecomment-183330257
  
    Thanks for the comment @rmetzger. I changed the error message. Please review and let me know.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1524#discussion_r51123441
  
    --- Diff: flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java ---
    @@ -0,0 +1,124 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.configuration;
    +
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.core.testutils.CommonTestUtils;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.PrintWriter;
    +import java.lang.reflect.Field;
    +import java.net.URI;
    +import java.net.URISyntaxException;
    +
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +public class FilesystemSchemeConfigTest {
    --- End diff --
    
    I think you need not worry about the GlobalConfiguration here. The `FileSystem` simply accepts a `Configuration`, which you can simply create without writing files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the pull request:

    https://github.com/apache/flink/pull/1524#issuecomment-182951707
  
    I have updated the PR with the new comments.
    Please review the new PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1524#discussion_r50253093
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
    @@ -303,6 +303,16 @@
     	// ------------------------ File System Behavior ------------------------
     
     	/**
    +	 * Key to specify the default filesystem to be used by a job, e.g.
    +	 * <code>file:///</code> which is the default (see {@link ConfigConstants#DEFAULT_FILESYSTEM_SCHEME})
    --- End diff --
    
    Same questions as wth the docs. Why `file:///`? And I would remove the `HDFS deployment` part at the end.
    
    Furthermore, I would add something alone the lines of `This scheme is used <b>ONLY</b> if no other scheme is specified in the user-provided <code>URI</code>` to the docs as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1524#discussion_r50253565
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java ---
    @@ -159,56 +164,91 @@ public int hashCode() {
     	/**
     	 * Returns a reference to the {@link FileSystem} instance for accessing the
     	 * local file system.
    -	 * 
    +	 *
     	 * @return a reference to the {@link FileSystem} instance for accessing the
    -	 *         local file system.
    +	 * local file system.
     	 */
     	public static FileSystem getLocalFileSystem() {
     		// this should really never fail.
     		try {
     			URI localUri = OperatingSystem.isWindows() ? new URI("file:/") : new URI("file:///");
     			return get(localUri);
    -		}
    -		catch (Exception e) {
    +		} catch (Exception e) {
     			throw new RuntimeException("Cannot create URI for local file system");
     		}
     	}
     
     	/**
    +	 * The default filesystem scheme to be used. This can be specified by the parameter
    +	 * <code>fs.default-scheme</code> in <code>flink-conf.yaml</code>. By default this is
    +	 * set to <code>file:///</code> and uses the local filesystem.
    +	 * */
    +	private static URI defaultScheme;
    +
    +	/**
    +	 * Sets the default filesystem scheme based on the user-specified configuration parameter
    +	 * <code>fs.default-scheme</code>.
    +	 * <li>
    +	 * As an example, if set to <code>hdfs://localhost:9000/</code>, then an HDFS deployment
    +	 * with the namenode being on the local node and listening to port 9000 is going to be used.
    +	 * In this case, a file path specified as <code>/user/USERNAME/in.txt</code>
    +	 * is going to be transformed into <code>hdfs://localhost:9000/user/USERNAME/in.txt</code>. By
    --- End diff --
    
    Depending on what you think about my comments about the doc text, you could also apply it here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1524#discussion_r50253153
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java ---
    @@ -46,30 +48,35 @@
     public abstract class FileSystem {
     
     	private static final String LOCAL_FILESYSTEM_CLASS = "org.apache.flink.core.fs.local.LocalFileSystem";
    -	
    +
     	private static final String HADOOP_WRAPPER_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFileSystem";
     
     	private static final String MAPR_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.maprfs.MapRFileSystem";
    -	
    +
     	private static final String HADOOP_WRAPPER_SCHEME = "hdwrapper";
     
    -	
    -	/** Object used to protect calls to specific methods.*/
    +
    +	/**
    --- End diff --
    
    If this was a auto-reformat: The single line `/** ... */` comments are OK.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1524#discussion_r53014234
  
    --- Diff: flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala ---
    @@ -176,6 +184,8 @@ abstract class ApplicationMasterBase {
             jobManagerPort, webServerPort, slots, taskManagerCount,
             dynamicPropertiesEncodedString)
     
    +      //todo should I also set the FS default here????
    --- End diff --
    
    No. I'll remove this TODO when merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1524#discussion_r50253495
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java ---
    @@ -159,56 +164,91 @@ public int hashCode() {
     	/**
     	 * Returns a reference to the {@link FileSystem} instance for accessing the
     	 * local file system.
    -	 * 
    +	 *
     	 * @return a reference to the {@link FileSystem} instance for accessing the
    -	 *         local file system.
    +	 * local file system.
     	 */
     	public static FileSystem getLocalFileSystem() {
     		// this should really never fail.
     		try {
     			URI localUri = OperatingSystem.isWindows() ? new URI("file:/") : new URI("file:///");
     			return get(localUri);
    -		}
    -		catch (Exception e) {
    +		} catch (Exception e) {
     			throw new RuntimeException("Cannot create URI for local file system");
     		}
     	}
     
     	/**
    +	 * The default filesystem scheme to be used. This can be specified by the parameter
    +	 * <code>fs.default-scheme</code> in <code>flink-conf.yaml</code>. By default this is
    +	 * set to <code>file:///</code> and uses the local filesystem.
    +	 * */
    +	private static URI defaultScheme;
    +
    +	/**
    +	 * Sets the default filesystem scheme based on the user-specified configuration parameter
    +	 * <code>fs.default-scheme</code>.
    +	 * <li>
    --- End diff --
    
    Should be a `<p>`? It's not consistent, but recently I saw the
    
    ```
    <p>TEXT
    ```
    
    more than 
    
    ```
    <p>
    TEXT
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1524#discussion_r50253604
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java ---
    @@ -159,56 +164,91 @@ public int hashCode() {
     	/**
     	 * Returns a reference to the {@link FileSystem} instance for accessing the
     	 * local file system.
    -	 * 
    +	 *
     	 * @return a reference to the {@link FileSystem} instance for accessing the
    -	 *         local file system.
    +	 * local file system.
     	 */
     	public static FileSystem getLocalFileSystem() {
     		// this should really never fail.
     		try {
     			URI localUri = OperatingSystem.isWindows() ? new URI("file:/") : new URI("file:///");
     			return get(localUri);
    -		}
    -		catch (Exception e) {
    +		} catch (Exception e) {
     			throw new RuntimeException("Cannot create URI for local file system");
     		}
     	}
     
     	/**
    +	 * The default filesystem scheme to be used. This can be specified by the parameter
    +	 * <code>fs.default-scheme</code> in <code>flink-conf.yaml</code>. By default this is
    +	 * set to <code>file:///</code> and uses the local filesystem.
    +	 * */
    +	private static URI defaultScheme;
    +
    +	/**
    +	 * Sets the default filesystem scheme based on the user-specified configuration parameter
    +	 * <code>fs.default-scheme</code>.
    +	 * <li>
    +	 * As an example, if set to <code>hdfs://localhost:9000/</code>, then an HDFS deployment
    +	 * with the namenode being on the local node and listening to port 9000 is going to be used.
    +	 * In this case, a file path specified as <code>/user/USERNAME/in.txt</code>
    +	 * is going to be transformed into <code>hdfs://localhost:9000/user/USERNAME/in.txt</code>. By
    +	 * default this is set to <code>file:///</code> which points to the local filesystem.
    +	 * @param config the configuration from where to fetch the parameter.
    +	 * */
    +	public static void setDefaultScheme(Configuration config) throws IOException {
    +		if (defaultScheme == null) {
    +			String stringifiedUri = config.getString(ConfigConstants.FILESYSTEM_SCHEME,
    +				ConfigConstants.DEFAULT_FILESYSTEM_SCHEME);
    +			try {
    +				defaultScheme = new URI(stringifiedUri);
    +			} catch (URISyntaxException e) {
    +				throw new IOException("The URI used to set the default filesystem " +
    +					"scheme ('" + stringifiedUri + "') is not valid.");
    +			}
    +		}
    +	}
    +
    +	/**
     	 * Returns a reference to the {@link FileSystem} instance for accessing the
     	 * file system identified by the given {@link URI}.
    -	 * 
    -	 * @param uri
    -	 *        the {@link URI} identifying the file system
    +	 *
    +	 * @param uri the {@link URI} identifying the file system
     	 * @return a reference to the {@link FileSystem} instance for accessing the file system identified by the given
    -	 *         {@link URI}.
    -	 * @throws IOException
    -	 *         thrown if a reference to the file system instance could not be obtained
    +	 * {@link URI}.
    +	 * @throws IOException thrown if a reference to the file system instance could not be obtained
     	 */
     	public static FileSystem get(URI uri) throws IOException {
     		FileSystem fs;
     
     		synchronized (SYNCHRONIZATION_OBJECT) {
    -
     			if (uri.getScheme() == null) {
     				try {
    -					uri = new URI("file", null, uri.getPath(), null);
    -				}
    -				catch (URISyntaxException e) {
    +					if(defaultScheme == null) {
    --- End diff --
    
    formatting: we use `if (...)` (with whitespace after the keyword)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1524#discussion_r51123709
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java ---
    @@ -159,56 +163,109 @@ public int hashCode() {
     	/**
     	 * Returns a reference to the {@link FileSystem} instance for accessing the
     	 * local file system.
    -	 * 
    +	 *
     	 * @return a reference to the {@link FileSystem} instance for accessing the
    -	 *         local file system.
    +	 * local file system.
     	 */
     	public static FileSystem getLocalFileSystem() {
     		// this should really never fail.
     		try {
     			URI localUri = OperatingSystem.isWindows() ? new URI("file:/") : new URI("file:///");
     			return get(localUri);
    -		}
    -		catch (Exception e) {
    +		} catch (Exception e) {
     			throw new RuntimeException("Cannot create URI for local file system");
     		}
     	}
     
     	/**
    +	 * The default filesystem scheme to be used. This can be specified by the parameter
    +	 * <code>fs.default-scheme</code> in <code>flink-conf.yaml</code>. By default this is
    +	 * set to <code>file:///</code> (see {@link ConfigConstants#FILESYSTEM_SCHEME}
    +	 * and {@link ConfigConstants#DEFAULT_FILESYSTEM_SCHEME}), and uses the local filesystem.
    +	 */
    +	private static URI defaultScheme;
    +
    +	/**
    +	 * <p>Sets the default filesystem scheme based on the user-specified configuration parameter
    +	 * <code>fs.default-scheme</code>. By default this is set to <code>file:///</code>
    +	 * (see {@link ConfigConstants#FILESYSTEM_SCHEME} and
    +	 * {@link ConfigConstants#DEFAULT_FILESYSTEM_SCHEME}),
    +	 * and the local filesystem is used.
    +	 * <p>
    +	 * As an example, if set to <code>hdfs://localhost:9000/</code>, then an HDFS deployment
    +	 * with the namenode being on the local node and listening to port 9000 is going to be used.
    +	 * In this case, a file path specified as <code>/user/USERNAME/in.txt</code>
    +	 * is going to be transformed into <code>hdfs://localhost:9000/user/USERNAME/in.txt</code>. By
    +	 * default this is set to <code>file:///</code> which points to the local filesystem.
    +	 * @param config the configuration from where to fetch the parameter.
    +	 */
    +	public static void setDefaultScheme(Configuration config) throws IOException {
    +		synchronized (SYNCHRONIZATION_OBJECT) {
    +			if (defaultScheme == null) {
    +				String stringifiedUri = config.getString(ConfigConstants.FILESYSTEM_SCHEME,
    +					ConfigConstants.DEFAULT_FILESYSTEM_SCHEME);
    +				try {
    +					defaultScheme = new URI(stringifiedUri);
    +				} catch (URISyntaxException e) {
    +					throw new IOException("The URI used to set the default filesystem " +
    +						"scheme ('" + stringifiedUri + "') is not valid.");
    +				}
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * <p><b>ATTENTION:</b> this method is only used in tests.</p>
    +	 * <p>Clears the previously set <code>fs.default-scheme</code>
    +	 * (see {@link ConfigConstants#FILESYSTEM_SCHEME})</p>
    +	 */
    +	public static void clearDefaultScheme() {
    --- End diff --
    
    This should not be public. Users do not read comments and simply call this. The tests probably need a custom reflection cleaner to reset that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1524#issuecomment-182065768
  
    I identified the following issues:
    
    - Setting the configuration using the yarn session "dynamic properties": `./bin/yarn-session.sh -n 2 -Dfs.default-scheme=hdfs:///` does not really work (the configuration parameter shows up in the web interface, but the job fails)
    - Setting a false schema leads to a null pointer exception on job submission. In the flink-conf.yaml, I have `fs.default-scheme: thisIsWrong`. Look at this:
    
    ```
    org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Cannot initialize task 'DataSink (CsvOutputFormat (path: /user/robert/elasdoijwef, delimiter:  ))': null
    	at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
    	at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
    	at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
    	at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
    	at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:78)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
    	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
    	at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
    	at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:804)
    	at org.apache.flink.client.CliFrontend.run(CliFrontend.java:331)
    	at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1127)
    	at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1175)
    Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (CsvOutputFormat (path: /user/robert/elasdoijwef, delimiter:  ))': null
    	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:981)
    	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:965)
    	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:965)
    	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:378)
    	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    	at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
    	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    	at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:105)
    	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    Caused by: java.lang.NullPointerException
    	at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:255)
    	at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
    	at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:275)
    	at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84)
    	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:977)
    	... 29 more
    robert@cdh544-master:~/flink/build-target$ ./bin/flink run ./examples/batch/WordCount.jar /user/robert/tpch-100lineitems.csv /user/robert/elasdoijwef
    
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1524#discussion_r50252356
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java ---
    @@ -146,6 +147,15 @@ public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {
     
     		flinkYarnClient.setConfigurationFilePath(confPath);
     
    +		try {
    +			FileSystem.setDefaultScheme(flinkConfiguration);
    +		} catch (IOException e) {
    +			LOG.error("Error while setting the default " +
    +				"filesystem scheme from configuration.", e);
    +			return null;
    +		}
    +
    +
    --- End diff --
    
    empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the pull request:

    https://github.com/apache/flink/pull/1524#issuecomment-182817638
  
    Thanks @rmetzger for the comment. Will fix it!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1524


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1524#discussion_r50252183
  
    --- Diff: docs/setup/config.md ---
    @@ -52,6 +52,14 @@ The configuration files for the TaskManagers can be different, Flink does not as
     - `parallelism.default`: The default parallelism to use for programs that have no parallelism specified. (DEFAULT: 1). For setups that have no concurrent jobs running, setting this value to NumTaskManagers * NumSlotsPerTaskManager will cause the system to use all available execution resources for the program's execution. **Note**: The default parallelism can be overwriten for an entire job by calling `setParallelism(int parallelism)` on the `ExecutionEnvironment` or by passing `-p <parallelism>` to the Flink Command-line frontend. It can be overwritten for single transformations by calling `setParallelism(int
     parallelism)` on an operator. See the [programming guide]({{site.baseurl}}/apis/programming_guide.html#parallel-execution) for more information about the parallelism.
     
    +- `fs.default-scheme`: The default filesystem scheme to be used, with the necessary authority (if needed). 
    +By default, this is set to `file:///` which points to the local filesystem. This means that the local 
    --- End diff --
    
    Should this be `file:///` (two `/`)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the pull request:

    https://github.com/apache/flink/pull/1524#issuecomment-178507931
  
    Could you explain what more tests do you have in mind? So far I am testing 1) if the scheme provided in the configuration is used when one is not explicitly provided, 2) if an explicit scheme overrides the configuration one, and 3) if a scheme from the configuration overrides the default one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1524#discussion_r51123237
  
    --- Diff: flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java ---
    @@ -19,13 +19,19 @@
     package org.apache.flink.configuration;
     
     import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    --- End diff --
    
    These added imports are probably not necessary


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1524#issuecomment-176194702
  
    Thanks for picking up this issue. This goes into a very good direction.
    Few remaining points:
    
      - There are a lot of unrelated reformattings. The general rule is not to reformat comments or indentation for anything not touched in the pull request. Otherwise all people keep just formatting back and forth each others changes, because everyone believes they personally figured out the better code style ;-) Would be good to undo the changes in the `FileSystem` class, for example.
    
      - Some tests would be great. With refactorings of YARN going on, I can see that this change will be accidentally undone next time something with a main() method is touched, unless a test guards this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1524#issuecomment-184702790
  
    I tested the change again on a cluster. Everything is working nicely with YARN.
    
    I'll merge the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1524#issuecomment-183288223
  
    Thank you.
    
    - ` ./bin/yarn-session.sh -tm 1024 -n 7 -s 2 -Dfs.default-scheme=hdfs:///` is now working
    - Setting the fs.default-scheme to `thisIsWrong` and trying to read from `/user/robert/trashaa` leads to:
    ```
    Caused by: java.io.IOException: The URI '/user/robert/trashaa' is invalid. Hint: Did you specify only the PATH to your file (without the scheme) but forgot the initial slash (/) in the beginning???
    ```
    I don't think putting three question marks there is a good idea. I think it implicates that the user is too stupid to properly specify a file path.
    And I would like to see the default scheme there. Right now, its quite hard to see what's wrong.
    
    the error reporting for the default scheme: `thisIsWrong:///` and the path `/user/robert/trashaa` is good again: 
    ```
    Caused by: java.io.IOException: No file system found with scheme thisIsWrong, referenced in file URI 'thisIsWrong:/user/robert/trashaaa'.
    ```
    I would expect a similar error reporting for `fs.default-scheme=thisIsWrong`
    
    
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1524#discussion_r50253320
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java ---
    @@ -159,56 +164,91 @@ public int hashCode() {
     	/**
     	 * Returns a reference to the {@link FileSystem} instance for accessing the
     	 * local file system.
    -	 * 
    +	 *
     	 * @return a reference to the {@link FileSystem} instance for accessing the
    -	 *         local file system.
    +	 * local file system.
     	 */
     	public static FileSystem getLocalFileSystem() {
     		// this should really never fail.
     		try {
     			URI localUri = OperatingSystem.isWindows() ? new URI("file:/") : new URI("file:///");
     			return get(localUri);
    -		}
    -		catch (Exception e) {
    +		} catch (Exception e) {
     			throw new RuntimeException("Cannot create URI for local file system");
     		}
     	}
     
     	/**
    +	 * The default filesystem scheme to be used. This can be specified by the parameter
    +	 * <code>fs.default-scheme</code> in <code>flink-conf.yaml</code>. By default this is
    --- End diff --
    
    You could also link the `ConfigConstants` keys and default values here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---