You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by dawidwys <gi...@git.apache.org> on 2018/05/31 13:12:01 UTC

[GitHub] flink pull request #6107: [FLINK-9366] DistributedCache works with Distribut...

GitHub user dawidwys opened a pull request:

    https://github.com/apache/flink/pull/6107

    [FLINK-9366] DistributedCache works with Distributed File System

    ## What is the purpose of the change
    
    Allows to use DistributeCache to cache files from cluster internal DFS (reverted behaviour).
    
    ## Brief change log
    
    *(for example:)*
      - Local files (based on uri schema) are distributed via BlobServer
      - Files in DFS are cached from DFS
    
    ## Verifying this change
    
    * Added test for distributing files through DFS: org.apache.flink.hdfstests.DistributedCacheDfsTest
    * local files shipping covered by previous tests 
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`:no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no 
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no 
      - The S3 file system connector: no 
    
    ## Documentation
    
      - Does this pull request introduce a new feature? no


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dawidwys/flink FLINK-9366

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6107.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6107
    
----
commit cc1344d18e2b1d9953e21d5189f2813fdabd7b01
Author: Dawid Wysakowicz <dw...@...>
Date:   2018-05-31T12:35:44Z

    [FLINK-9366] DistributedCache works with Distributed File System

----


---

[GitHub] flink pull request #6107: [FLINK-9366] DistributedCache works with Distribut...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6107


---

[GitHub] flink pull request #6107: [FLINK-9366] DistributedCache works with Distribut...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6107#discussion_r193341492
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---
    @@ -844,7 +844,7 @@ public JobExecutionResult execute() throws Exception {
     	/**
     	 * Registers a file at the distributed cache under the given name. The file will be accessible
     	 * from any user-defined function in the (distributed) runtime under a local path. Files
    -	 * may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
    +	 * may be local files (which will be distributed viaBlobServer), or files in a distributed file system.
    --- End diff --
    
    missing space after via (several instances)


---

[GitHub] flink pull request #6107: [FLINK-9366] DistributedCache works with Distribut...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6107#discussion_r192234433
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java ---
    @@ -0,0 +1,166 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.hdfstests;
    +
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.core.fs.FileStatus;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.test.util.MiniClusterResource;
    +import org.apache.flink.util.NetUtils;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hdfs.MiniDFSCluster;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.URI;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Tests for distributing files with {@link org.apache.flink.api.common.cache.DistributedCache} via HDFS.
    + */
    +public class DistributedCacheDfsTest {
    +
    +	private static final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
    +		+ "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
    +		+ "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n"
    +		+ "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n"
    +		+ "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n"
    +		+ "hohen Werke Sind herrlich wie am ersten Tag.\n"
    +		+ "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n"
    +		+ "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n"
    +		+ "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n"
    +		+ "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
    +		+ "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n"
    +		+ "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n"
    +		+ "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n"
    +		+ "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
    +
    +	@ClassRule
    +	public static TemporaryFolder tempFolder = new TemporaryFolder();
    +
    +	private static MiniClusterResource miniClusterResource;
    +	private static MiniDFSCluster hdfsCluster;
    +	private static Configuration conf = new Configuration();
    +
    +	private static Path testFile;
    +	private static Path testDir;
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		File dataDir = tempFolder.newFolder();
    +
    +		conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
    +		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
    +		hdfsCluster = builder.build();
    +
    +		String hdfsURI = "hdfs://"
    +			+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
    +			+ "/";
    +
    +		miniClusterResource = new MiniClusterResource(
    --- End diff --
    
    is it necessary that the flink cluster is started afte rthe dfs cluster? Otherwise you could use this as a JUnit `Rule`,


---

[GitHub] flink pull request #6107: [FLINK-9366] DistributedCache works with Distribut...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6107#discussion_r194703701
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.hdfstests;
    +
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.core.fs.FileStatus;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.test.util.MiniClusterResource;
    +import org.apache.flink.util.NetUtils;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hdfs.MiniDFSCluster;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.URI;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Tests for distributing files with {@link org.apache.flink.api.common.cache.DistributedCache} via HDFS.
    + */
    +public class DistributedCacheDfsTest {
    +
    +	private static final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
    +		+ "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
    +		+ "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n"
    +		+ "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n"
    +		+ "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n"
    +		+ "hohen Werke Sind herrlich wie am ersten Tag.\n"
    +		+ "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n"
    +		+ "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n"
    +		+ "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n"
    +		+ "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
    +		+ "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n"
    +		+ "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n"
    +		+ "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n"
    +		+ "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
    +
    +	@ClassRule
    +	public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
    +
    +	@ClassRule
    +	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
    +		new MiniClusterResource.MiniClusterResourceConfiguration(
    +			new org.apache.flink.configuration.Configuration(),
    +			1,
    +			1));
    +
    +	private static MiniDFSCluster hdfsCluster;
    +	private static Configuration conf = new Configuration();
    +
    +	private static Path testFile;
    +	private static Path testDir;
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		File dataDir = TEMP_FOLDER.newFolder();
    +
    +		conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
    +		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
    +		hdfsCluster = builder.build();
    +
    +		String hdfsURI = "hdfs://"
    +			+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
    +			+ "/";
    +
    +		FileSystem dfs = FileSystem.get(new URI(hdfsURI));
    +		testFile = writeFile(dfs, dfs.getHomeDirectory(), "testFile");
    +
    +		testDir = new Path(dfs.getHomeDirectory(), "testDir");
    +		dfs.mkdirs(testDir);
    +		writeFile(dfs, testDir, "testFile1");
    +		writeFile(dfs, testDir, "testFile2");
    +	}
    +
    +	private static Path writeFile(FileSystem dfs, Path rootDir, String fileName) throws IOException {
    +		Path file = new Path(rootDir, fileName);
    +		try (
    +			DataOutputStream outStream = new DataOutputStream(dfs.create(file,
    +				FileSystem.WriteMode.OVERWRITE))) {
    +			outStream.writeUTF(testFileContent);
    +		}
    +		return file;
    +	}
    +
    +	@AfterClass
    +	public static void teardown() {
    +		hdfsCluster.shutdown();
    +	}
    +
    +	@Test
    +	public void testDistributeFileViaDFS() throws Exception {
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.setParallelism(1);
    +
    +		env.registerCachedFile(testFile.toString(), "test_data", false);
    +		env.registerCachedFile(testDir.toString(), "test_dir", false);
    +
    +		env.fromElements(1)
    +			.map(new TestMapFunction())
    +			.print();
    +
    +		env.execute("Distributed Cache Via Blob Test Program");
    +	}
    +
    +	static class TestMapFunction extends RichMapFunction<Integer, String> {
    --- End diff --
    
    can we make this private?


---

[GitHub] flink pull request #6107: [FLINK-9366] DistributedCache works with Distribut...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6107#discussion_r192334113
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java ---
    @@ -0,0 +1,166 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.hdfstests;
    +
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.core.fs.FileStatus;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.test.util.MiniClusterResource;
    +import org.apache.flink.util.NetUtils;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hdfs.MiniDFSCluster;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.URI;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Tests for distributing files with {@link org.apache.flink.api.common.cache.DistributedCache} via HDFS.
    + */
    +public class DistributedCacheDfsTest {
    +
    +	private static final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
    +		+ "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
    +		+ "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n"
    +		+ "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n"
    +		+ "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n"
    +		+ "hohen Werke Sind herrlich wie am ersten Tag.\n"
    +		+ "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n"
    +		+ "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n"
    +		+ "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n"
    +		+ "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
    +		+ "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n"
    +		+ "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n"
    +		+ "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n"
    +		+ "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
    +
    +	@ClassRule
    +	public static TemporaryFolder tempFolder = new TemporaryFolder();
    +
    +	private static MiniClusterResource miniClusterResource;
    +	private static MiniDFSCluster hdfsCluster;
    +	private static Configuration conf = new Configuration();
    +
    +	private static Path testFile;
    +	private static Path testDir;
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		File dataDir = tempFolder.newFolder();
    +
    +		conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
    +		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
    +		hdfsCluster = builder.build();
    +
    +		String hdfsURI = "hdfs://"
    +			+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
    +			+ "/";
    +
    +		miniClusterResource = new MiniClusterResource(
    --- End diff --
    
    It is not necessary. Changed it.


---

[GitHub] flink pull request #6107: [FLINK-9366] DistributedCache works with Distribut...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6107#discussion_r192695040
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.hdfstests;
    +
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.core.fs.FileStatus;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.test.util.MiniClusterResource;
    +import org.apache.flink.util.NetUtils;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hdfs.MiniDFSCluster;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.URI;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Tests for distributing files with {@link org.apache.flink.api.common.cache.DistributedCache} via HDFS.
    + */
    +public class DistributedCacheDfsTest {
    +
    +	private static final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
    +		+ "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
    +		+ "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n"
    +		+ "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n"
    +		+ "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n"
    +		+ "hohen Werke Sind herrlich wie am ersten Tag.\n"
    +		+ "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n"
    +		+ "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n"
    +		+ "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n"
    +		+ "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
    +		+ "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n"
    +		+ "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n"
    +		+ "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n"
    +		+ "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
    +
    +	@ClassRule
    +	public static TemporaryFolder tempFolder = new TemporaryFolder();
    --- End diff --
    
    can be final.


---

[GitHub] flink pull request #6107: [FLINK-9366] DistributedCache works with Distribut...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6107#discussion_r192691637
  
    --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---
    @@ -690,12 +690,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
         f
       }
     
    -
       /**
         * Registers a file at the distributed cache under the given name. The file will be accessible
         * from any user-defined function in the (distributed) runtime under a local path. Files
    -    * may be local files (as long as all relevant workers have access to it), or files in a
    -    * distributed file system. The runtime will copy the files temporarily to a local cache,
    +    * may be local files (will be distributed via BlobServer), or files in a distributed file
    --- End diff --
    
    add `which` before `will` for better flow`. Also applies to other instances of this method.


---

[GitHub] flink pull request #6107: [FLINK-9366] DistributedCache works with Distribut...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6107#discussion_r194705137
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
    @@ -584,17 +584,25 @@ public void uploadUserArtifacts(InetSocketAddress blobServerAddress, Configurati
     		if (!userArtifacts.isEmpty()) {
     			try (BlobClient blobClient = new BlobClient(blobServerAddress, clientConfig)) {
     				for (Map.Entry<String, DistributedCache.DistributedCacheEntry> userArtifact : userArtifacts.entrySet()) {
    -
    -					final PermanentBlobKey key = blobClient.uploadFile(jobID,
    -						new Path(userArtifact.getValue().filePath));
    -
    -					DistributedCache.writeFileInfoToConfig(
    -						userArtifact.getKey(),
    -						new DistributedCache.DistributedCacheEntry(
    -							userArtifact.getValue().filePath,
    -							userArtifact.getValue().isExecutable,
    -							InstantiationUtil.serializeObject(key)),
    -						jobConfiguration);
    +					Path filePath = new Path(userArtifact.getValue().filePath);
    +
    +					if (filePath.getFileSystem().isDistributedFS()) {
    --- End diff --
    
    The file-system class may not be accessible on the client, so you have to guard this with a try-catch block and write it into the config in both cases.


---

[GitHub] flink pull request #6107: [FLINK-9366] DistributedCache works with Distribut...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6107#discussion_r193340830
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---
    @@ -844,7 +844,7 @@ public JobExecutionResult execute() throws Exception {
     	/**
     	 * Registers a file at the distributed cache under the given name. The file will be accessible
     	 * from any user-defined function in the (distributed) runtime under a local path. Files
    -	 * may be local files (will be distributed via BlobServer), or files in a distributed file system.
    +	 * may be local files (which will be distributed viaBlobServer), or files in a distributed file system.
    --- End diff --
    
    missing space `viaBlobServer`


---

[GitHub] flink issue #6107: [FLINK-9366] DistributedCache works with Distributed File...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/6107
  
    Looks good functionality wise, had some minor comments.


---

[GitHub] flink pull request #6107: [FLINK-9366] DistributedCache works with Distribut...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6107#discussion_r192718847
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java ---
    @@ -15,8 +15,12 @@
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
    +
     package org.apache.flink.api.common.cache;
     
    +import org.apache.flink.annotation.Public;
    +import org.apache.flink.configuration.Configuration;
    --- End diff --
    
    If you make this file checkstyle-compliant please also remove the exclusion in `tools/maven/suppressions-core.xml`


---

[GitHub] flink pull request #6107: [FLINK-9366] DistributedCache works with Distribut...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6107#discussion_r192692204
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java ---
    @@ -302,6 +308,30 @@ public Path call() throws IOException {
     		}
     	}
     
    +	/**
    +	 * Asynchronous file copy process.
    +	 */
    +	private static class CopyFromDFSProcess implements Callable<Path> {
    +
    +		private final Path filePath;
    +		private final Path cachedPath;
    +		private boolean executable;
    +
    +		public CopyFromDFSProcess(DistributedCacheEntry e, Path cachedPath) {
    +			this.filePath = new Path(e.filePath);
    +			this.executable = e.isExecutable;
    +			this.cachedPath = cachedPath;
    +		}
    +
    +		@Override
    +		public Path call() throws IOException {
    +			// let exceptions propagate. we can retrieve them later from
    +			// the future and report them upon access to the result
    +			copy(filePath, cachedPath, this.executable);
    --- End diff --
    
    can you prefix this with `FileUtils`? Imo this makes the code more readable.


---

[GitHub] flink pull request #6107: [FLINK-9366] DistributedCache works with Distribut...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6107#discussion_r192721111
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java ---
    @@ -15,8 +15,12 @@
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
    +
     package org.apache.flink.api.common.cache;
     
    +import org.apache.flink.annotation.Public;
    +import org.apache.flink.configuration.Configuration;
    --- End diff --
    
    Sure, I wondered why it did not fail the build before...


---

[GitHub] flink issue #6107: [FLINK-9366] DistributedCache works with Distributed File...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/6107
  
    Could you have a look @zentol @aljoscha ?


---

[GitHub] flink pull request #6107: [FLINK-9366] DistributedCache works with Distribut...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6107#discussion_r192695014
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.hdfstests;
    +
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.core.fs.FileStatus;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.test.util.MiniClusterResource;
    +import org.apache.flink.util.NetUtils;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hdfs.MiniDFSCluster;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.URI;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Tests for distributing files with {@link org.apache.flink.api.common.cache.DistributedCache} via HDFS.
    + */
    +public class DistributedCacheDfsTest {
    +
    +	private static final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
    +		+ "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
    +		+ "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n"
    +		+ "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n"
    +		+ "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n"
    +		+ "hohen Werke Sind herrlich wie am ersten Tag.\n"
    +		+ "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n"
    +		+ "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n"
    +		+ "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n"
    +		+ "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
    +		+ "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n"
    +		+ "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n"
    +		+ "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n"
    +		+ "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
    +
    +	@ClassRule
    +	public static TemporaryFolder tempFolder = new TemporaryFolder();
    +
    +	@ClassRule
    +	public static MiniClusterResource miniClusterResource  = new MiniClusterResource(
    --- End diff --
    
    can be final, double space before `=`.


---

[GitHub] flink issue #6107: [FLINK-9366] DistributedCache works with Distributed File...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/6107
  
    merging.


---