You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/04 11:23:44 UTC
[1/2] flink git commit: [hotfix] Let DistributedCacheDfsTest extend
TestLogger and avoid stdout output
Repository: flink
Updated Branches:
refs/heads/master a3bc2bdc6 -> 2fd2ccb65
[hotfix] Let DistributedCacheDfsTest extend TestLogger and avoid stdout output
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2fd2ccb6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2fd2ccb6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2fd2ccb6
Branch: refs/heads/master
Commit: 2fd2ccb65fb213f251425265b6319b1e740a471a
Parents: acb8edc
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Jul 4 12:13:17 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 4 12:13:48 2018 +0200
----------------------------------------------------------------------
.../org/apache/flink/hdfstests/DistributedCacheDfsTest.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2fd2ccb6/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java
index ed4ec94..4c8440d 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java
@@ -23,9 +23,11 @@ import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.TestLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -48,7 +50,7 @@ import static org.junit.Assert.assertTrue;
/**
* Tests for distributing files with {@link org.apache.flink.api.common.cache.DistributedCache} via HDFS.
*/
-public class DistributedCacheDfsTest {
+public class DistributedCacheDfsTest extends TestLogger {
private static final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
+ "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
@@ -128,7 +130,7 @@ public class DistributedCacheDfsTest {
env.fromElements(1)
.map(new TestMapFunction())
- .print();
+ .addSink(new DiscardingSink<>());
env.execute("Distributed Cache Via Blob Test Program");
}
[2/2] flink git commit: [FLINK-9622] Do not swallow exceptions in
FileUtils#copy
Posted by tr...@apache.org.
[FLINK-9622] Do not swallow exceptions in FileUtils#copy
This closes #6244.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/acb8edc2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/acb8edc2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/acb8edc2
Branch: refs/heads/master
Commit: acb8edc2d42d5f16014d10b7aa42bba2f2f8267f
Parents: a3bc2bd
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Jul 4 12:13:10 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 4 12:13:48 2018 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/util/FileUtils.java | 41 ++++++++++++--------
.../org/apache/flink/util/FileUtilsTest.java | 2 +-
2 files changed, 25 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/acb8edc2/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
index 713ed61..23af2e8 100644
--- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
@@ -333,26 +333,33 @@ public final class FileUtils {
FileSystem tFS = FileSystem.getUnguardedFileSystem(targetPath.toUri());
if (!tFS.exists(targetPath)) {
if (sFS.getFileStatus(sourcePath).isDir()) {
- tFS.mkdirs(targetPath);
- FileStatus[] contents = sFS.listStatus(sourcePath);
- for (FileStatus content : contents) {
- String distPath = content.getPath().toString();
- if (content.isDir()) {
- if (distPath.endsWith("/")) {
- distPath = distPath.substring(0, distPath.length() - 1);
- }
- }
- String localPath = targetPath.toString() + distPath.substring(distPath.lastIndexOf("/"));
- copy(content.getPath(), new Path(localPath), executable);
- }
+ internalCopyDirectory(sourcePath, targetPath, executable, sFS, tFS);
} else {
- try (FSDataOutputStream lfsOutput = tFS.create(targetPath, FileSystem.WriteMode.NO_OVERWRITE); FSDataInputStream fsInput = sFS.open(sourcePath)) {
- IOUtils.copyBytes(fsInput, lfsOutput);
- //noinspection ResultOfMethodCallIgnored
- new File(targetPath.toString()).setExecutable(executable);
- } catch (IOException ignored) {
+ internalCopyFile(sourcePath, targetPath, executable, sFS, tFS);
+ }
+ }
+ }
+
+ private static void internalCopyDirectory(Path sourcePath, Path targetPath, boolean executable, FileSystem sFS, FileSystem tFS) throws IOException {
+ tFS.mkdirs(targetPath);
+ FileStatus[] contents = sFS.listStatus(sourcePath);
+ for (FileStatus content : contents) {
+ String distPath = content.getPath().toString();
+ if (content.isDir()) {
+ if (distPath.endsWith("/")) {
+ distPath = distPath.substring(0, distPath.length() - 1);
}
}
+ String localPath = targetPath + distPath.substring(distPath.lastIndexOf("/"));
+ copy(content.getPath(), new Path(localPath), executable);
+ }
+ }
+
+ private static void internalCopyFile(Path sourcePath, Path targetPath, boolean executable, FileSystem sFS, FileSystem tFS) throws IOException {
+ try (FSDataOutputStream lfsOutput = tFS.create(targetPath, FileSystem.WriteMode.NO_OVERWRITE); FSDataInputStream fsInput = sFS.open(sourcePath)) {
+ IOUtils.copyBytes(fsInput, lfsOutput);
+ //noinspection ResultOfMethodCallIgnored
+ new File(targetPath.toString()).setExecutable(executable);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/acb8edc2/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
index b42ee22..23878cb 100644
--- a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
@@ -49,7 +49,7 @@ import static org.junit.Assume.assumeTrue;
/**
* Tests for the {@link FileUtils}.
*/
-public class FileUtilsTest {
+public class FileUtilsTest extends TestLogger {
@Rule
public final TemporaryFolder tmp = new TemporaryFolder();