You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by wenlong88 <gi...@git.apache.org> on 2017/02/22 09:39:19 UTC

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

GitHub user wenlong88 opened a pull request:

    https://github.com/apache/flink/pull/3388

    [FLINK-5815] Add resource files configuration for Yarn Mode

    This PR add three common resource configuration options to yarn mode, which allow user to set single file resource from both local filesystem and remote hdfs filesystem as what we can do using mapreduce, including:
    1. add -yfiles . -ylibjars for adding local resource file to yarn per-job cluster, user can provide a list of file paths to add some local jars or dictionary files to yarn distributed cache.
    2. add -yarchives for adding remote resource files to yarn per-job cluster, user can provide a list of uri of files which can be stored on hdfs, user can rename the file by fragment of the uri.
    all of the files will be distributed to every TM and JM by yarn and added to classpath of TM and JM.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/wenlong88/flink jira-5815

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3388.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3388
    
----
commit 77f27600368de02c26d9a45b2e575585728a2ddd
Author: wenlong.lwl <we...@alibaba-inc.com>
Date:   2017-01-04T02:52:31Z

    add -yfiles -ylibjars -yarchives for yarn resource file management

commit e8957721e951d16861aff27af4d58e5ac42ec81b
Author: wenlong.lwl <we...@alibaba-inc.com>
Date:   2017-02-22T09:35:55Z

    remove useless change

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103197159
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
    @@ -136,21 +137,36 @@ public static Path setupLocalResource(
     			Path homedir) throws IOException {
     
     		// copy resource to HDFS
    -		String suffix = ".flink/" + appId + "/" + localRsrcPath.getName();
    -
    -		Path dst = new Path(homedir, suffix);
    +		Path dst = getRemoteResourceRoot(appId, localRsrcPath, homedir);
     
     		LOG.info("Copying from " + localRsrcPath + " to " + dst);
     		fs.copyFromLocalFile(localRsrcPath, dst);
     		registerLocalResource(fs, dst, appMasterJar);
     		return dst;
     	}
     
    +	public static Path getRemoteResourceRoot(
    --- End diff --
    
    Maybe it's better called `getRemoteResourcePath`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103197726
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---
    @@ -131,6 +134,8 @@
     	/** Lazily initialized list of files to ship */
     	protected List<File> shipFiles = new LinkedList<>();
     
    +	private List<URI> archives = new LinkedList<>();
    --- End diff --
    
    `LinkedLists` are not so fast. Better to use an `ArrayList` and initialize it with an initial capacity.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103194179
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileUtilsTest.java ---
    @@ -0,0 +1,117 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.hdfstests;
    +
    +import org.apache.flink.core.fs.FSDataInputStream;
    +import org.apache.flink.core.fs.FSDataOutputStream;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.test.util.AbstractTestBase;
    +import org.apache.flink.util.FileUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hdfs.MiniDFSCluster;
    +import org.junit.*;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.URI;
    +
    +import static org.junit.Assert.fail;
    +
    +/**
    + * Created by wenlong.lwl on 2017/2/21.
    + */
    +public class FileUtilsTest {
    +	@ClassRule
    +	public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private static File TEMP_DIR;
    +
    +	private static String HDFS_ROOT_URI;
    +
    +	private static MiniDFSCluster HDFS_CLUSTER;
    +
    +	private static FileSystem FS;
    +
    +	// ------------------------------------------------------------------------
    +	//  startup / shutdown
    +	// ------------------------------------------------------------------------
    +
    +	@BeforeClass
    +	public static void createHDFS() {
    +		try {
    +			TEMP_DIR = temporaryFolder.newFolder();
    +
    +			Configuration hdConf = new Configuration();
    +			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEMP_DIR.getAbsolutePath());
    +			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
    +			HDFS_CLUSTER = builder.build();
    +
    +			HDFS_ROOT_URI = "hdfs://" + HDFS_CLUSTER.getURI().getHost() + ":"
    +				+ HDFS_CLUSTER.getNameNodePort() + "/";
    +
    +			FS = FileSystem.get(new URI(HDFS_ROOT_URI));
    +		}
    +		catch (Exception e) {
    +			e.printStackTrace();
    +			fail("Could not create HDFS mini cluster " + e.getMessage());
    +		}
    +	}
    +
    +	@AfterClass
    +	public static void destroyHDFS() {
    +		try {
    +			HDFS_CLUSTER.shutdown();
    +		}
    +		catch (Exception ignored) {}
    +	}
    +
    +	@Test
    +	public void testCompareFs() throws IOException {
    +		File localDir = temporaryFolder.newFolder();
    +		Assert.assertFalse(FileUtils.compareFs(new Path(localDir.toURI()).getFileSystem(), FS));
    --- End diff --
    
    For this test, I think we don't have to create a HDFSCluster to obtain an HDFS `FileSystem`. Can't we create the respective URIs?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by wenlong88 <gi...@git.apache.org>.
Github user wenlong88 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103878609
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---
    @@ -131,6 +134,8 @@
     	/** Lazily initialized list of files to ship */
     	protected List<File> shipFiles = new LinkedList<>();
     
    +	private List<URI> archives = new LinkedList<>();
    --- End diff --
    
    it is because we cannot know the size in advance, and there are usually little elements for archives, so I just keep the same structure with shipFiles.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103195968
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---
    @@ -742,6 +755,24 @@ public FileVisitResult preVisitDirectory(java.nio.file.Path dir, BasicFileAttrib
     			}
     		}
     
    +		for (URI originURI : archives) {
    +			Path remoteParent = Utils.getRemoteResourceRoot(appId.toString(), new Path(originURI.getPath()), fs.getHomeDirectory());
    +			String fragment = originURI.getFragment();
    +			Path target = new Path(
    +				FileUtils.localizeRemoteFiles(new org.apache.flink.core.fs.Path(remoteParent.toUri()), originURI).toUri());
    +			URI targetURI = target.toUri();
    +			if (targetURI.getFragment() == null && fragment != null) {
    +				targetURI = new URI(target.toUri().toString() + "#" + fragment);
    --- End diff --
    
    Why do we need to append a fragment here? I thought if `remoteParent` contained a fragment, then `target` would be a file of that fragment and be named after the fragment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by wenlong88 <gi...@git.apache.org>.
Github user wenlong88 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103878685
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---
    @@ -742,6 +755,24 @@ public FileVisitResult preVisitDirectory(java.nio.file.Path dir, BasicFileAttrib
     			}
     		}
     
    +		for (URI originURI : archives) {
    +			Path remoteParent = Utils.getRemoteResourceRoot(appId.toString(), new Path(originURI.getPath()), fs.getHomeDirectory());
    +			String fragment = originURI.getFragment();
    +			Path target = new Path(
    +				FileUtils.localizeRemoteFiles(new org.apache.flink.core.fs.Path(remoteParent.toUri()), originURI).toUri());
    +			URI targetURI = target.toUri();
    +			if (targetURI.getFragment() == null && fragment != null) {
    +				targetURI = new URI(target.toUri().toString() + "#" + fragment);
    +			}
    +			LocalResource archive = Records.newRecord(LocalResource.class);
    +			FileStatus state = fs.getFileStatus(target);
    +			Utils.registerLocalResource(targetURI, state.getLen(), state.getModificationTime(), archive);
    +			localResources.put(fragment, archive);
    --- End diff --
    
    fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by wenlong88 <gi...@git.apache.org>.
Github user wenlong88 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103878668
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---
    @@ -742,6 +755,24 @@ public FileVisitResult preVisitDirectory(java.nio.file.Path dir, BasicFileAttrib
     			}
     		}
     
    +		for (URI originURI : archives) {
    +			Path remoteParent = Utils.getRemoteResourceRoot(appId.toString(), new Path(originURI.getPath()), fs.getHomeDirectory());
    +			String fragment = originURI.getFragment();
    +			Path target = new Path(
    +				FileUtils.localizeRemoteFiles(new org.apache.flink.core.fs.Path(remoteParent.toUri()), originURI).toUri());
    +			URI targetURI = target.toUri();
    +			if (targetURI.getFragment() == null && fragment != null) {
    +				targetURI = new URI(target.toUri().toString() + "#" + fragment);
    --- End diff --
    
    you are right about the file naming rule. When using Path as localizeFile's output type, the uri fragment will be dropped, so we need to recover the fragment there. I have change the return type to `URI`, so it is not needed any more.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by wenlong88 <gi...@git.apache.org>.
Github user wenlong88 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103878983
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java ---
    @@ -85,6 +89,10 @@ public FlinkYarnCLI(String shortPrefix, String longPrefix) {
     		DETACHED = new Option(shortPrefix + "a", longPrefix + "attached", false, "Start attached");
     		ZOOKEEPER_NAMESPACE = new Option(shortPrefix + "z", longPrefix + "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for high availability mode");
     
    +		LIB_JARS = new Option(shortPrefix + "libjars", longPrefix + "libjars", true, "Jar file paths for job, like /home/user/lib/test.jar");
    +		FILES = new Option(shortPrefix + "files", longPrefix + "files", true, "Normal file paths for job, like /home/user/lib/test.dict");
    +		ARCHIVES = new Option(shortPrefix + "archives", longPrefix + "archives", true, "Archived file uris for job, like hdfs:///users/flink/common_dict#dict");
    --- End diff --
    
    It is totally different using the BlobStore. For clusters with BlobStore, DistributedCache may be enough. I think in the future, we can support DistributedCache with BlobStore in the future, and support external BlobStore structure which already run on existed framework, like yarn distributed cache in the future


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103196509
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---
    @@ -742,6 +755,24 @@ public FileVisitResult preVisitDirectory(java.nio.file.Path dir, BasicFileAttrib
     			}
     		}
     
    +		for (URI originURI : archives) {
    +			Path remoteParent = Utils.getRemoteResourceRoot(appId.toString(), new Path(originURI.getPath()), fs.getHomeDirectory());
    +			String fragment = originURI.getFragment();
    +			Path target = new Path(
    +				FileUtils.localizeRemoteFiles(new org.apache.flink.core.fs.Path(remoteParent.toUri()), originURI).toUri());
    +			URI targetURI = target.toUri();
    +			if (targetURI.getFragment() == null && fragment != null) {
    +				targetURI = new URI(target.toUri().toString() + "#" + fragment);
    +			}
    +			LocalResource archive = Records.newRecord(LocalResource.class);
    +			FileStatus state = fs.getFileStatus(target);
    +			Utils.registerLocalResource(targetURI, state.getLen(), state.getModificationTime(), archive);
    +			localResources.put(fragment, archive);
    +			paths.add(new Path(targetURI));
    --- End diff --
    
    isn't that the same as `target`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by wenlong88 <gi...@git.apache.org>.
Github user wenlong88 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103878445
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/FileUtils.java ---
    @@ -258,7 +261,82 @@ public static boolean deletePathIfEmpty(FileSystem fileSystem, Path path) throws
     			return false;
     		}
     	}
    -	
    +
    +	/**
    +	 * Check whether the two given filesystem is the same or not
    +	 *
    +	 * @param srcFs
    +	 * @param destFs
    +	 * @return
    +	 */
    +	public static boolean compareFs(FileSystem srcFs, FileSystem destFs) {
    +		URI srcUri = srcFs.getUri();
    +		URI dstUri = destFs.getUri();
    +
    +		// check schema
    +		if (srcUri.getScheme() == null) {
    +			return false;
    +		}
    +		if (!srcUri.getScheme().equals(dstUri.getScheme())) {
    +			return false;
    +		}
    +
    +		// check ports
    +		if (srcUri.getPort() != dstUri.getPort()) {
    +			return false;
    +		}
    +
    +		// check ip
    +		String srcHost = srcUri.getHost();
    +		String dstHost = dstUri.getHost();
    +		if ((srcHost != null) && (dstHost != null)) {
    +			if (!srcHost.equals(dstHost)) {
    +				try {
    +					srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
    +					dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
    +				} catch (UnknownHostException ue) {
    +					return false;
    +				}
    +				if (!srcHost.equals(dstHost)) {
    +					return false;
    +				}
    +			}
    +		} else if (srcHost == null && dstHost != null) {
    +			return false;
    +		} else if (srcHost != null && dstHost == null) {
    +			return false;
    +		}
    +		return true;
    +	}
    +
    +	/**
    +	 * Check where the original file belongs to the same filesystem as the local dir, if not copy the remote file to
    +	 * local dir. If the original File uri has a fragment, the fragment will be used as local file name.
    +	 *
    +	 * @param localDir      local directory to store remote files.
    +	 * @param originalFile  original file to check
    +	 * @return
    +	 * @throws IOException
    +	 */
    +	public static Path localizeRemoteFiles(Path localDir, URI originalFile) throws IOException {
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by wenlong88 <gi...@git.apache.org>.
Github user wenlong88 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103878488
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileUtilsTest.java ---
    @@ -0,0 +1,117 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.hdfstests;
    +
    +import org.apache.flink.core.fs.FSDataInputStream;
    +import org.apache.flink.core.fs.FSDataOutputStream;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.test.util.AbstractTestBase;
    +import org.apache.flink.util.FileUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hdfs.MiniDFSCluster;
    +import org.junit.*;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.URI;
    +
    +import static org.junit.Assert.fail;
    +
    +/**
    + * Created by wenlong.lwl on 2017/2/21.
    --- End diff --
    
    sorry, this is a missing clean up from our code base.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by wenlong88 <gi...@git.apache.org>.
Github user wenlong88 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103878472
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/FileUtils.java ---
    @@ -258,7 +261,82 @@ public static boolean deletePathIfEmpty(FileSystem fileSystem, Path path) throws
     			return false;
     		}
     	}
    -	
    +
    +	/**
    +	 * Check whether the two given filesystem is the same or not
    +	 *
    +	 * @param srcFs
    +	 * @param destFs
    +	 * @return
    +	 */
    +	public static boolean compareFs(FileSystem srcFs, FileSystem destFs) {
    +		URI srcUri = srcFs.getUri();
    +		URI dstUri = destFs.getUri();
    +
    +		// check schema
    +		if (srcUri.getScheme() == null) {
    +			return false;
    +		}
    +		if (!srcUri.getScheme().equals(dstUri.getScheme())) {
    +			return false;
    +		}
    +
    +		// check ports
    +		if (srcUri.getPort() != dstUri.getPort()) {
    +			return false;
    +		}
    +
    +		// check ip
    +		String srcHost = srcUri.getHost();
    +		String dstHost = dstUri.getHost();
    +		if ((srcHost != null) && (dstHost != null)) {
    +			if (!srcHost.equals(dstHost)) {
    +				try {
    +					srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
    +					dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
    +				} catch (UnknownHostException ue) {
    +					return false;
    +				}
    +				if (!srcHost.equals(dstHost)) {
    +					return false;
    +				}
    +			}
    +		} else if (srcHost == null && dstHost != null) {
    +			return false;
    +		} else if (srcHost != null && dstHost == null) {
    +			return false;
    +		}
    +		return true;
    +	}
    +
    +	/**
    +	 * Check where the original file belongs to the same filesystem as the local dir, if not copy the remote file to
    +	 * local dir. If the original File uri has a fragment, the fragment will be used as local file name.
    +	 *
    +	 * @param localDir      local directory to store remote files.
    +	 * @param originalFile  original file to check
    +	 * @return
    +	 * @throws IOException
    +	 */
    +	public static Path localizeRemoteFiles(Path localDir, URI originalFile) throws IOException {
    +
    +		Path originalPath = new Path(originalFile);
    +
    +		FileSystem remoteFs = originalPath.getFileSystem();
    +		FileSystem localFs = localDir.getFileSystem();
    +		if (compareFs(remoteFs, localFs)) {
    +			return originalPath;
    +		}
    +
    +		String fragment = originalFile.getFragment();
    +		if(fragment == null) {
    +			fragment = originalPath.getName();
    +		}
    +		Path newPath = new Path(localDir, fragment);
    +		IOUtils.copyBytes(remoteFs.open(originalPath), localFs.create(newPath, FileSystem.WriteMode.OVERWRITE), true);
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3388: [FLINK-5815] Add resource files configuration for Yarn Mo...

Posted by wenlong88 <gi...@git.apache.org>.
Github user wenlong88 commented on the issue:

    https://github.com/apache/flink/pull/3388
  
    @tillrohrmann sorry for late response, I have addressed the comments. Thinks for the review.
    
    Currently for standalone, and mesos, there are already some ways to do things like that, `--classpath `, `DistributedCache in batch` , we need to do some refactor and make all of this more clear to user in the future before adding a new one. But it is necessary to make flip-6 implementation in yarn run well.
    
    the test is not added because the `flink-yarn-test` is disabled, and I was not able to enable it. Is there any problem in running yarn test now?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r102957246
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/FileUtils.java ---
    @@ -258,7 +261,82 @@ public static boolean deletePathIfEmpty(FileSystem fileSystem, Path path) throws
     			return false;
     		}
     	}
    -	
    +
    +	/**
    +	 * Check whether the two given filesystem is the same or not
    +	 *
    +	 * @param srcFs
    +	 * @param destFs
    +	 * @return
    +	 */
    +	public static boolean compareFs(FileSystem srcFs, FileSystem destFs) {
    +		URI srcUri = srcFs.getUri();
    +		URI dstUri = destFs.getUri();
    +
    +		// check schema
    +		if (srcUri.getScheme() == null) {
    --- End diff --
    
    What if `srcFs.getScheme() == null` and `destFs.getScheme() == null`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103178916
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/FileUtils.java ---
    @@ -258,7 +261,82 @@ public static boolean deletePathIfEmpty(FileSystem fileSystem, Path path) throws
     			return false;
     		}
     	}
    -	
    +
    +	/**
    +	 * Check whether the two given filesystem is the same or not
    +	 *
    +	 * @param srcFs
    +	 * @param destFs
    +	 * @return
    +	 */
    +	public static boolean compareFs(FileSystem srcFs, FileSystem destFs) {
    +		URI srcUri = srcFs.getUri();
    +		URI dstUri = destFs.getUri();
    +
    +		// check schema
    +		if (srcUri.getScheme() == null) {
    +			return false;
    +		}
    +		if (!srcUri.getScheme().equals(dstUri.getScheme())) {
    +			return false;
    +		}
    +
    +		// check ports
    +		if (srcUri.getPort() != dstUri.getPort()) {
    +			return false;
    +		}
    +
    +		// check ip
    +		String srcHost = srcUri.getHost();
    +		String dstHost = dstUri.getHost();
    +		if ((srcHost != null) && (dstHost != null)) {
    +			if (!srcHost.equals(dstHost)) {
    +				try {
    +					srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
    +					dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
    +				} catch (UnknownHostException ue) {
    +					return false;
    +				}
    +				if (!srcHost.equals(dstHost)) {
    +					return false;
    +				}
    +			}
    +		} else if (srcHost == null && dstHost != null) {
    +			return false;
    +		} else if (srcHost != null && dstHost == null) {
    +			return false;
    +		}
    +		return true;
    +	}
    +
    +	/**
    +	 * Check where the original file belongs to the same filesystem as the local dir, if not copy the remote file to
    +	 * local dir. If the original File uri has a fragment, the fragment will be used as local file name.
    +	 *
    +	 * @param localDir      local directory to store remote files.
    +	 * @param originalFile  original file to check
    +	 * @return
    +	 * @throws IOException
    --- End diff --
    
    description is missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103196755
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---
    @@ -742,6 +755,24 @@ public FileVisitResult preVisitDirectory(java.nio.file.Path dir, BasicFileAttrib
     			}
     		}
     
    +		for (URI originURI : archives) {
    +			Path remoteParent = Utils.getRemoteResourceRoot(appId.toString(), new Path(originURI.getPath()), fs.getHomeDirectory());
    +			String fragment = originURI.getFragment();
    +			Path target = new Path(
    +				FileUtils.localizeRemoteFiles(new org.apache.flink.core.fs.Path(remoteParent.toUri()), originURI).toUri());
    +			URI targetURI = target.toUri();
    +			if (targetURI.getFragment() == null && fragment != null) {
    +				targetURI = new URI(target.toUri().toString() + "#" + fragment);
    +			}
    +			LocalResource archive = Records.newRecord(LocalResource.class);
    +			FileStatus state = fs.getFileStatus(target);
    +			Utils.registerLocalResource(targetURI, state.getLen(), state.getModificationTime(), archive);
    +			localResources.put(fragment, archive);
    +			paths.add(new Path(targetURI));
    +			classPathBuilder.append(fragment).append(File.pathSeparator);
    +			envShipFileList.append(targetURI).append(",");
    +		}
    --- End diff --
    
    I've got the feeling that this code snippet contains a lot of path to URI and back to path conversions. Maybe we could streamline it a little bit to avoid unnecessary object instantiations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103196163
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---
    @@ -742,6 +755,24 @@ public FileVisitResult preVisitDirectory(java.nio.file.Path dir, BasicFileAttrib
     			}
     		}
     
    +		for (URI originURI : archives) {
    +			Path remoteParent = Utils.getRemoteResourceRoot(appId.toString(), new Path(originURI.getPath()), fs.getHomeDirectory());
    +			String fragment = originURI.getFragment();
    +			Path target = new Path(
    +				FileUtils.localizeRemoteFiles(new org.apache.flink.core.fs.Path(remoteParent.toUri()), originURI).toUri());
    +			URI targetURI = target.toUri();
    +			if (targetURI.getFragment() == null && fragment != null) {
    +				targetURI = new URI(target.toUri().toString() + "#" + fragment);
    --- End diff --
    
    Haven't we already extracted the target URI in `targetURI`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103194600
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---
    @@ -248,6 +253,14 @@ public void addShipFiles(List<File> shipFiles) {
     		}
     	}
     
    +	public void addArchives(String archive) {
    +		try {
    +			this.archives.add(new URI(archive));
    +		} catch (URISyntaxException e) {
    +			throw new IllegalArgumentException(e);
    --- End diff --
    
    Maybe we could add a short exception message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103177388
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/FileUtils.java ---
    @@ -258,7 +261,82 @@ public static boolean deletePathIfEmpty(FileSystem fileSystem, Path path) throws
     			return false;
     		}
     	}
    -	
    +
    +	/**
    +	 * Check whether the two given filesystem is the same or not
    +	 *
    +	 * @param srcFs
    +	 * @param destFs
    +	 * @return
    +	 */
    +	public static boolean compareFs(FileSystem srcFs, FileSystem destFs) {
    +		URI srcUri = srcFs.getUri();
    +		URI dstUri = destFs.getUri();
    +
    +		// check schema
    +		if (srcUri.getScheme() == null) {
    +			return false;
    +		}
    +		if (!srcUri.getScheme().equals(dstUri.getScheme())) {
    +			return false;
    +		}
    +
    +		// check ports
    +		if (srcUri.getPort() != dstUri.getPort()) {
    +			return false;
    +		}
    +
    +		// check ip
    +		String srcHost = srcUri.getHost();
    +		String dstHost = dstUri.getHost();
    +		if ((srcHost != null) && (dstHost != null)) {
    +			if (!srcHost.equals(dstHost)) {
    +				try {
    +					srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
    +					dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
    --- End diff --
    
    Is it better to try to find out the FQDN for an IP? Isn't it the case that `getCanonicalHostName` can fail? I'm wondering whether comparing the IP addresses wouldn't be more stable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103195082
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---
    @@ -742,6 +755,24 @@ public FileVisitResult preVisitDirectory(java.nio.file.Path dir, BasicFileAttrib
     			}
     		}
     
    +		for (URI originURI : archives) {
    +			Path remoteParent = Utils.getRemoteResourceRoot(appId.toString(), new Path(originURI.getPath()), fs.getHomeDirectory());
    +			String fragment = originURI.getFragment();
    +			Path target = new Path(
    +				FileUtils.localizeRemoteFiles(new org.apache.flink.core.fs.Path(remoteParent.toUri()), originURI).toUri());
    +			URI targetURI = target.toUri();
    +			if (targetURI.getFragment() == null && fragment != null) {
    +				targetURI = new URI(target.toUri().toString() + "#" + fragment);
    --- End diff --
    
    no need for `toString`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3388: [FLINK-5815] Add resource files configuration for Yarn Mo...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on the issue:

    https://github.com/apache/flink/pull/3388
  
    Just a note: For Mesos, inevitably we'll add support for arbitrary URLs to be downloaded by the container.   The Mesos fetcher deals with HTTP URLs primarily but actually supports HDFS URLs too (if configured; seems rarely used).
    
    Stepping back, we need the clustering layer to converge because supporting these features in a one-off way is becoming unsustainable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by wenlong88 <gi...@git.apache.org>.
Github user wenlong88 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103878339
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/FileUtils.java ---
    @@ -258,7 +261,82 @@ public static boolean deletePathIfEmpty(FileSystem fileSystem, Path path) throws
     			return false;
     		}
     	}
    -	
    +
    +	/**
    +	 * Check whether the two given filesystem is the same or not
    +	 *
    +	 * @param srcFs
    +	 * @param destFs
    +	 * @return
    +	 */
    +	public static boolean compareFs(FileSystem srcFs, FileSystem destFs) {
    +		URI srcUri = srcFs.getUri();
    +		URI dstUri = destFs.getUri();
    +
    +		// check schema
    +		if (srcUri.getScheme() == null) {
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103177052
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/FileUtils.java ---
    @@ -258,7 +261,82 @@ public static boolean deletePathIfEmpty(FileSystem fileSystem, Path path) throws
     			return false;
     		}
     	}
    -	
    +
    +	/**
    +	 * Check whether the two given filesystem is the same or not
    +	 *
    +	 * @param srcFs
    +	 * @param destFs
    +	 * @return
    +	 */
    +	public static boolean compareFs(FileSystem srcFs, FileSystem destFs) {
    +		URI srcUri = srcFs.getUri();
    +		URI dstUri = destFs.getUri();
    +
    +		// check schema
    +		if (srcUri.getScheme() == null) {
    +			return false;
    +		}
    +		if (!srcUri.getScheme().equals(dstUri.getScheme())) {
    +			return false;
    +		}
    +
    +		// check ports
    +		if (srcUri.getPort() != dstUri.getPort()) {
    +			return false;
    +		}
    +
    +		// check ip
    +		String srcHost = srcUri.getHost();
    +		String dstHost = dstUri.getHost();
    +		if ((srcHost != null) && (dstHost != null)) {
    +			if (!srcHost.equals(dstHost)) {
    +				try {
    +					srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
    +					dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
    +				} catch (UnknownHostException ue) {
    +					return false;
    +				}
    +				if (!srcHost.equals(dstHost)) {
    +					return false;
    +				}
    +			}
    +		} else if (srcHost == null && dstHost != null) {
    +			return false;
    +		} else if (srcHost != null && dstHost == null) {
    +			return false;
    +		}
    --- End diff --
    
    Can't we simplify the check by checking if `srcHost != null` without `dstHost != null`? Then we would have only one else branch where we check `dstHost == null`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by wenlong88 <gi...@git.apache.org>.
Github user wenlong88 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103878546
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileUtilsTest.java ---
    @@ -0,0 +1,117 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.hdfstests;
    +
    +import org.apache.flink.core.fs.FSDataInputStream;
    +import org.apache.flink.core.fs.FSDataOutputStream;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.test.util.AbstractTestBase;
    +import org.apache.flink.util.FileUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hdfs.MiniDFSCluster;
    +import org.junit.*;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.URI;
    +
    +import static org.junit.Assert.fail;
    +
    +/**
    + * Created by wenlong.lwl on 2017/2/21.
    + */
    +public class FileUtilsTest {
    +	@ClassRule
    +	public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private static File TEMP_DIR;
    +
    +	private static String HDFS_ROOT_URI;
    +
    +	private static MiniDFSCluster HDFS_CLUSTER;
    +
    +	private static FileSystem FS;
    +
    +	// ------------------------------------------------------------------------
    +	//  startup / shutdown
    +	// ------------------------------------------------------------------------
    +
    +	@BeforeClass
    +	public static void createHDFS() {
    +		try {
    +			TEMP_DIR = temporaryFolder.newFolder();
    +
    +			Configuration hdConf = new Configuration();
    +			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEMP_DIR.getAbsolutePath());
    +			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
    +			HDFS_CLUSTER = builder.build();
    +
    +			HDFS_ROOT_URI = "hdfs://" + HDFS_CLUSTER.getURI().getHost() + ":"
    +				+ HDFS_CLUSTER.getNameNodePort() + "/";
    +
    +			FS = FileSystem.get(new URI(HDFS_ROOT_URI));
    +		}
    +		catch (Exception e) {
    +			e.printStackTrace();
    +			fail("Could not create HDFS mini cluster " + e.getMessage());
    +		}
    +	}
    +
    +	@AfterClass
    +	public static void destroyHDFS() {
    +		try {
    +			HDFS_CLUSTER.shutdown();
    +		}
    +		catch (Exception ignored) {}
    +	}
    +
    +	@Test
    +	public void testCompareFs() throws IOException {
    +		File localDir = temporaryFolder.newFolder();
    +		Assert.assertFalse(FileUtils.compareFs(new Path(localDir.toURI()).getFileSystem(), FS));
    --- End diff --
    
    Just taking advantage of the hdfs we have created for localize file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by wenlong88 <gi...@git.apache.org>.
Github user wenlong88 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103878413
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/FileUtils.java ---
    @@ -258,7 +261,82 @@ public static boolean deletePathIfEmpty(FileSystem fileSystem, Path path) throws
     			return false;
     		}
     	}
    -	
    +
    +	/**
    +	 * Check whether the two given filesystem is the same or not
    +	 *
    +	 * @param srcFs
    +	 * @param destFs
    +	 * @return
    +	 */
    +	public static boolean compareFs(FileSystem srcFs, FileSystem destFs) {
    +		URI srcUri = srcFs.getUri();
    +		URI dstUri = destFs.getUri();
    +
    +		// check schema
    +		if (srcUri.getScheme() == null) {
    +			return false;
    +		}
    +		if (!srcUri.getScheme().equals(dstUri.getScheme())) {
    +			return false;
    +		}
    +
    +		// check ports
    +		if (srcUri.getPort() != dstUri.getPort()) {
    +			return false;
    +		}
    +
    +		// check ip
    +		String srcHost = srcUri.getHost();
    +		String dstHost = dstUri.getHost();
    +		if ((srcHost != null) && (dstHost != null)) {
    +			if (!srcHost.equals(dstHost)) {
    +				try {
    +					srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
    +					dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
    +				} catch (UnknownHostException ue) {
    +					return false;
    +				}
    +				if (!srcHost.equals(dstHost)) {
    +					return false;
    +				}
    +			}
    +		} else if (srcHost == null && dstHost != null) {
    +			return false;
    +		} else if (srcHost != null && dstHost == null) {
    +			return false;
    +		}
    +		return true;
    +	}
    +
    +	/**
    +	 * Check where the original file belongs to the same filesystem as the local dir, if not copy the remote file to
    +	 * local dir. If the original File uri has a fragment, the fragment will be used as local file name.
    +	 *
    +	 * @param localDir      local directory to store remote files.
    +	 * @param originalFile  original file to check
    +	 * @return
    +	 * @throws IOException
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103181903
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/FileUtils.java ---
    @@ -258,7 +261,82 @@ public static boolean deletePathIfEmpty(FileSystem fileSystem, Path path) throws
     			return false;
     		}
     	}
    -	
    +
    +	/**
    +	 * Check whether the two given filesystem is the same or not
    +	 *
    +	 * @param srcFs
    +	 * @param destFs
    +	 * @return
    +	 */
    +	public static boolean compareFs(FileSystem srcFs, FileSystem destFs) {
    --- End diff --
    
    Don't we compare two URIs instead of two filesystems here? The only purpose we pass `srcFs` and `destFs` to this method is to obtain their URIs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103197098
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
    @@ -136,21 +137,36 @@ public static Path setupLocalResource(
     			Path homedir) throws IOException {
     
     		// copy resource to HDFS
    -		String suffix = ".flink/" + appId + "/" + localRsrcPath.getName();
    -
    -		Path dst = new Path(homedir, suffix);
    +		Path dst = getRemoteResourceRoot(appId, localRsrcPath, homedir);
     
     		LOG.info("Copying from " + localRsrcPath + " to " + dst);
     		fs.copyFromLocalFile(localRsrcPath, dst);
     		registerLocalResource(fs, dst, appMasterJar);
     		return dst;
     	}
     
    +	public static Path getRemoteResourceRoot(
    +			String appId, Path localRsrcPath,
    +			Path homedir) throws IOException {
    --- End diff --
    
    isn't `homedir` better called `targetDir`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by wenlong88 <gi...@git.apache.org>.
Github user wenlong88 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103878350
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/FileUtils.java ---
    @@ -258,7 +261,82 @@ public static boolean deletePathIfEmpty(FileSystem fileSystem, Path path) throws
     			return false;
     		}
     	}
    -	
    +
    +	/**
    +	 * Check whether the two given filesystem is the same or not
    +	 *
    +	 * @param srcFs
    +	 * @param destFs
    +	 * @return
    +	 */
    +	public static boolean compareFs(FileSystem srcFs, FileSystem destFs) {
    +		URI srcUri = srcFs.getUri();
    +		URI dstUri = destFs.getUri();
    +
    +		// check schema
    +		if (srcUri.getScheme() == null) {
    +			return false;
    +		}
    +		if (!srcUri.getScheme().equals(dstUri.getScheme())) {
    +			return false;
    +		}
    +
    +		// check ports
    +		if (srcUri.getPort() != dstUri.getPort()) {
    +			return false;
    +		}
    +
    +		// check ip
    +		String srcHost = srcUri.getHost();
    +		String dstHost = dstUri.getHost();
    +		if ((srcHost != null) && (dstHost != null)) {
    +			if (!srcHost.equals(dstHost)) {
    +				try {
    +					srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
    +					dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
    --- End diff --
    
    I think FQDN is better when multiple service running on the same machine, what do yo think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103194453
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/FileUtils.java ---
    @@ -258,7 +261,82 @@ public static boolean deletePathIfEmpty(FileSystem fileSystem, Path path) throws
     			return false;
     		}
     	}
    -	
    +
    +	/**
    +	 * Check whether the two given filesystem is the same or not
    +	 *
    +	 * @param srcFs
    +	 * @param destFs
    +	 * @return
    +	 */
    +	public static boolean compareFs(FileSystem srcFs, FileSystem destFs) {
    +		URI srcUri = srcFs.getUri();
    +		URI dstUri = destFs.getUri();
    +
    +		// check schema
    +		if (srcUri.getScheme() == null) {
    +			return false;
    +		}
    +		if (!srcUri.getScheme().equals(dstUri.getScheme())) {
    +			return false;
    +		}
    +
    +		// check ports
    +		if (srcUri.getPort() != dstUri.getPort()) {
    +			return false;
    +		}
    +
    +		// check ip
    +		String srcHost = srcUri.getHost();
    +		String dstHost = dstUri.getHost();
    +		if ((srcHost != null) && (dstHost != null)) {
    +			if (!srcHost.equals(dstHost)) {
    +				try {
    +					srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
    +					dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
    +				} catch (UnknownHostException ue) {
    +					return false;
    +				}
    +				if (!srcHost.equals(dstHost)) {
    +					return false;
    +				}
    +			}
    +		} else if (srcHost == null && dstHost != null) {
    +			return false;
    +		} else if (srcHost != null && dstHost == null) {
    +			return false;
    +		}
    +		return true;
    +	}
    +
    +	/**
    +	 * Check where the original file belongs to the same filesystem as the local dir, if not copy the remote file to
    +	 * local dir. If the original File uri has a fragment, the fragment will be used as local file name.
    +	 *
    +	 * @param localDir      local directory to store remote files.
    +	 * @param originalFile  original file to check
    +	 * @return
    +	 * @throws IOException
    +	 */
    +	public static Path localizeRemoteFiles(Path localDir, URI originalFile) throws IOException {
    +
    +		Path originalPath = new Path(originalFile);
    +
    +		FileSystem remoteFs = originalPath.getFileSystem();
    +		FileSystem localFs = localDir.getFileSystem();
    +		if (compareFs(remoteFs, localFs)) {
    +			return originalPath;
    +		}
    +
    +		String fragment = originalFile.getFragment();
    +		if(fragment == null) {
    +			fragment = originalPath.getName();
    +		}
    +		Path newPath = new Path(localDir, fragment);
    +		IOUtils.copyBytes(remoteFs.open(originalPath), localFs.create(newPath, FileSystem.WriteMode.OVERWRITE), true);
    --- End diff --
    
    Maybe we could issue a warning if we try to overwrite a file?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103196433
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---
    @@ -742,6 +755,24 @@ public FileVisitResult preVisitDirectory(java.nio.file.Path dir, BasicFileAttrib
     			}
     		}
     
    +		for (URI originURI : archives) {
    +			Path remoteParent = Utils.getRemoteResourceRoot(appId.toString(), new Path(originURI.getPath()), fs.getHomeDirectory());
    +			String fragment = originURI.getFragment();
    +			Path target = new Path(
    +				FileUtils.localizeRemoteFiles(new org.apache.flink.core.fs.Path(remoteParent.toUri()), originURI).toUri());
    +			URI targetURI = target.toUri();
    +			if (targetURI.getFragment() == null && fragment != null) {
    +				targetURI = new URI(target.toUri().toString() + "#" + fragment);
    +			}
    +			LocalResource archive = Records.newRecord(LocalResource.class);
    +			FileStatus state = fs.getFileStatus(target);
    +			Utils.registerLocalResource(targetURI, state.getLen(), state.getModificationTime(), archive);
    +			localResources.put(fragment, archive);
    --- End diff --
    
    What if `fragment == null`? Or is `fragment` always not `null`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by wenlong88 <gi...@git.apache.org>.
Github user wenlong88 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103878375
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/FileUtils.java ---
    @@ -258,7 +261,82 @@ public static boolean deletePathIfEmpty(FileSystem fileSystem, Path path) throws
     			return false;
     		}
     	}
    -	
    +
    +	/**
    +	 * Check whether the two given filesystem is the same or not
    +	 *
    +	 * @param srcFs
    +	 * @param destFs
    +	 * @return
    +	 */
    +	public static boolean compareFs(FileSystem srcFs, FileSystem destFs) {
    +		URI srcUri = srcFs.getUri();
    +		URI dstUri = destFs.getUri();
    +
    +		// check schema
    +		if (srcUri.getScheme() == null) {
    +			return false;
    +		}
    +		if (!srcUri.getScheme().equals(dstUri.getScheme())) {
    +			return false;
    +		}
    +
    +		// check ports
    +		if (srcUri.getPort() != dstUri.getPort()) {
    +			return false;
    +		}
    +
    +		// check ip
    +		String srcHost = srcUri.getHost();
    +		String dstHost = dstUri.getHost();
    +		if ((srcHost != null) && (dstHost != null)) {
    +			if (!srcHost.equals(dstHost)) {
    +				try {
    +					srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
    +					dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
    +				} catch (UnknownHostException ue) {
    +					return false;
    +				}
    +				if (!srcHost.equals(dstHost)) {
    +					return false;
    +				}
    +			}
    +		} else if (srcHost == null && dstHost != null) {
    +			return false;
    +		} else if (srcHost != null && dstHost == null) {
    +			return false;
    +		}
    --- End diff --
    
    ok, fixing this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103196289
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---
    @@ -742,6 +755,24 @@ public FileVisitResult preVisitDirectory(java.nio.file.Path dir, BasicFileAttrib
     			}
     		}
     
    +		for (URI originURI : archives) {
    +			Path remoteParent = Utils.getRemoteResourceRoot(appId.toString(), new Path(originURI.getPath()), fs.getHomeDirectory());
    +			String fragment = originURI.getFragment();
    +			Path target = new Path(
    +				FileUtils.localizeRemoteFiles(new org.apache.flink.core.fs.Path(remoteParent.toUri()), originURI).toUri());
    --- End diff --
    
    Maybe `FileUtils.localizeRemoteFiles` should return a `URI` if the returned `Path` has to be unpacked and then again packed into another `Path` type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103200589
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java ---
    @@ -85,6 +89,10 @@ public FlinkYarnCLI(String shortPrefix, String longPrefix) {
     		DETACHED = new Option(shortPrefix + "a", longPrefix + "attached", false, "Start attached");
     		ZOOKEEPER_NAMESPACE = new Option(shortPrefix + "z", longPrefix + "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for high availability mode");
     
    +		LIB_JARS = new Option(shortPrefix + "libjars", longPrefix + "libjars", true, "Jar file paths for job, like /home/user/lib/test.jar");
    +		FILES = new Option(shortPrefix + "files", longPrefix + "files", true, "Normal file paths for job, like /home/user/lib/test.dict");
    +		ARCHIVES = new Option(shortPrefix + "archives", longPrefix + "archives", true, "Archived file uris for job, like hdfs:///users/flink/common_dict#dict");
    --- End diff --
    
    I'm wondering whether we shouldn't offer these options also for the standalone and mesos case. We could use the BlobStore to distribute the files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103196883
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
    @@ -136,21 +137,36 @@ public static Path setupLocalResource(
     			Path homedir) throws IOException {
     
     		// copy resource to HDFS
    -		String suffix = ".flink/" + appId + "/" + localRsrcPath.getName();
    -
    -		Path dst = new Path(homedir, suffix);
    +		Path dst = getRemoteResourceRoot(appId, localRsrcPath, homedir);
     
     		LOG.info("Copying from " + localRsrcPath + " to " + dst);
     		fs.copyFromLocalFile(localRsrcPath, dst);
     		registerLocalResource(fs, dst, appMasterJar);
     		return dst;
     	}
     
    +	public static Path getRemoteResourceRoot(
    +			String appId, Path localRsrcPath,
    --- End diff --
    
    every parameter in a separate line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103195228
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/FileUtils.java ---
    @@ -258,7 +261,82 @@ public static boolean deletePathIfEmpty(FileSystem fileSystem, Path path) throws
     			return false;
     		}
     	}
    -	
    +
    +	/**
    +	 * Check whether the two given filesystem is the same or not
    +	 *
    +	 * @param srcFs
    +	 * @param destFs
    +	 * @return
    +	 */
    +	public static boolean compareFs(FileSystem srcFs, FileSystem destFs) {
    +		URI srcUri = srcFs.getUri();
    +		URI dstUri = destFs.getUri();
    +
    +		// check schema
    +		if (srcUri.getScheme() == null) {
    +			return false;
    +		}
    +		if (!srcUri.getScheme().equals(dstUri.getScheme())) {
    +			return false;
    +		}
    +
    +		// check ports
    +		if (srcUri.getPort() != dstUri.getPort()) {
    +			return false;
    +		}
    +
    +		// check ip
    +		String srcHost = srcUri.getHost();
    +		String dstHost = dstUri.getHost();
    +		if ((srcHost != null) && (dstHost != null)) {
    +			if (!srcHost.equals(dstHost)) {
    +				try {
    +					srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
    +					dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
    +				} catch (UnknownHostException ue) {
    +					return false;
    +				}
    +				if (!srcHost.equals(dstHost)) {
    +					return false;
    +				}
    +			}
    +		} else if (srcHost == null && dstHost != null) {
    +			return false;
    +		} else if (srcHost != null && dstHost == null) {
    +			return false;
    +		}
    +		return true;
    +	}
    +
    +	/**
    +	 * Check where the original file belongs to the same filesystem as the local dir, if not copy the remote file to
    +	 * local dir. If the original File uri has a fragment, the fragment will be used as local file name.
    +	 *
    +	 * @param localDir      local directory to store remote files.
    +	 * @param originalFile  original file to check
    +	 * @return
    +	 * @throws IOException
    +	 */
    +	public static Path localizeRemoteFiles(Path localDir, URI originalFile) throws IOException {
    --- End diff --
    
    Isn't it better called `localizeRemoteFile`, because it only copies one file?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by wenlong88 <gi...@git.apache.org>.
Github user wenlong88 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103878313
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/FileUtils.java ---
    @@ -258,7 +261,82 @@ public static boolean deletePathIfEmpty(FileSystem fileSystem, Path path) throws
     			return false;
     		}
     	}
    -	
    +
    +	/**
    +	 * Check whether the two given filesystem is the same or not
    +	 *
    +	 * @param srcFs
    +	 * @param destFs
    +	 * @return
    +	 */
    +	public static boolean compareFs(FileSystem srcFs, FileSystem destFs) {
    --- End diff --
    
    Here I just want to check whether the file system is the same or not, if we provide a function compare two URIs, we need to compare more extra elements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103177980
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileUtilsTest.java ---
    @@ -0,0 +1,117 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.hdfstests;
    +
    +import org.apache.flink.core.fs.FSDataInputStream;
    +import org.apache.flink.core.fs.FSDataOutputStream;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.test.util.AbstractTestBase;
    +import org.apache.flink.util.FileUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hdfs.MiniDFSCluster;
    +import org.junit.*;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.URI;
    +
    +import static org.junit.Assert.fail;
    +
    +/**
    + * Created by wenlong.lwl on 2017/2/21.
    + */
    +public class FileUtilsTest {
    +	@ClassRule
    +	public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private static File TEMP_DIR;
    +
    +	private static String HDFS_ROOT_URI;
    +
    +	private static MiniDFSCluster HDFS_CLUSTER;
    +
    +	private static FileSystem FS;
    +
    +	// ------------------------------------------------------------------------
    +	//  startup / shutdown
    +	// ------------------------------------------------------------------------
    +
    +	@BeforeClass
    +	public static void createHDFS() {
    +		try {
    +			TEMP_DIR = temporaryFolder.newFolder();
    +
    +			Configuration hdConf = new Configuration();
    +			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEMP_DIR.getAbsolutePath());
    +			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
    +			HDFS_CLUSTER = builder.build();
    +
    +			HDFS_ROOT_URI = "hdfs://" + HDFS_CLUSTER.getURI().getHost() + ":"
    +				+ HDFS_CLUSTER.getNameNodePort() + "/";
    +
    +			FS = FileSystem.get(new URI(HDFS_ROOT_URI));
    +		}
    +		catch (Exception e) {
    +			e.printStackTrace();
    +			fail("Could not create HDFS mini cluster " + e.getMessage());
    --- End diff --
    
    We used to follow this pattern. But nowadays it's better to let the exception bubble up and simply define a throw clause.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by wenlong88 <gi...@git.apache.org>.
Github user wenlong88 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103878523
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileUtilsTest.java ---
    @@ -0,0 +1,117 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.hdfstests;
    +
    +import org.apache.flink.core.fs.FSDataInputStream;
    +import org.apache.flink.core.fs.FSDataOutputStream;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.test.util.AbstractTestBase;
    +import org.apache.flink.util.FileUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hdfs.MiniDFSCluster;
    +import org.junit.*;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.URI;
    +
    +import static org.junit.Assert.fail;
    +
    +/**
    + * Created by wenlong.lwl on 2017/2/21.
    + */
    +public class FileUtilsTest {
    +	@ClassRule
    +	public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private static File TEMP_DIR;
    +
    +	private static String HDFS_ROOT_URI;
    +
    +	private static MiniDFSCluster HDFS_CLUSTER;
    +
    +	private static FileSystem FS;
    +
    +	// ------------------------------------------------------------------------
    +	//  startup / shutdown
    +	// ------------------------------------------------------------------------
    +
    +	@BeforeClass
    +	public static void createHDFS() {
    +		try {
    +			TEMP_DIR = temporaryFolder.newFolder();
    +
    +			Configuration hdConf = new Configuration();
    +			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEMP_DIR.getAbsolutePath());
    +			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
    +			HDFS_CLUSTER = builder.build();
    +
    +			HDFS_ROOT_URI = "hdfs://" + HDFS_CLUSTER.getURI().getHost() + ":"
    +				+ HDFS_CLUSTER.getNameNodePort() + "/";
    +
    +			FS = FileSystem.get(new URI(HDFS_ROOT_URI));
    +		}
    +		catch (Exception e) {
    +			e.printStackTrace();
    +			fail("Could not create HDFS mini cluster " + e.getMessage());
    --- End diff --
    
    ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by wenlong88 <gi...@git.apache.org>.
Github user wenlong88 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103878642
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---
    @@ -248,6 +253,14 @@ public void addShipFiles(List<File> shipFiles) {
     		}
     	}
     
    +	public void addArchives(String archive) {
    +		try {
    +			this.archives.add(new URI(archive));
    +		} catch (URISyntaxException e) {
    +			throw new IllegalArgumentException(e);
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3388: [FLINK-5815] Add resource files configuration for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3388#discussion_r103177702
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileUtilsTest.java ---
    @@ -0,0 +1,117 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.hdfstests;
    +
    +import org.apache.flink.core.fs.FSDataInputStream;
    +import org.apache.flink.core.fs.FSDataOutputStream;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.test.util.AbstractTestBase;
    +import org.apache.flink.util.FileUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hdfs.MiniDFSCluster;
    +import org.junit.*;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.URI;
    +
    +import static org.junit.Assert.fail;
    +
    +/**
    + * Created by wenlong.lwl on 2017/2/21.
    --- End diff --
    
    We don't do these comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---