You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/03/05 18:50:16 UTC

flink git commit: [FLINK-1640] Remove tailing slash from paths. Add tests for Path and FileOutputFormat.

Repository: flink
Updated Branches:
  refs/heads/master eae2166dd -> 8f321c729


[FLINK-1640] Remove tailing slash from paths. Add tests for Path and FileOutputFormat.

This closes #453


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8f321c72
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8f321c72
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8f321c72

Branch: refs/heads/master
Commit: 8f321c7299155990d4e4341f3831a843535219e8
Parents: eae2166
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed Mar 4 15:24:26 2015 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Mar 5 17:56:34 2015 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/core/fs/Path.java     |  45 ++-
 .../api/common/io/FileOutputFormatTest.java     | 379 +++++++++++--------
 .../java/org/apache/flink/core/fs/PathTest.java | 142 +++++++
 3 files changed, 390 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8f321c72/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
index a104d86..30a2a65 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
@@ -37,6 +37,8 @@ import org.apache.flink.util.StringUtils;
 /**
  * Names a file or directory in a {@link FileSystem}. Path strings use slash as
  * the directory separator. A path string is absolute if it begins with a slash.
+ *
+ * Tailing slashes are removed from the path.
  */
 public class Path implements IOReadableWritable, Serializable {
 	
@@ -71,7 +73,7 @@ public class Path implements IOReadableWritable, Serializable {
 	 * Constructs a path object from a given URI.
 	 * 
 	 * @param uri
-	 *        the URI to contruct the path object from
+	 *        the URI to construct the path object from
 	 */
 	public Path(URI uri) {
 		this.uri = uri;
@@ -143,20 +145,24 @@ public class Path implements IOReadableWritable, Serializable {
 	}
 
 	/**
-	 * Checks if the provided path string is either null or has zero length and throws
+ 	 * Checks if the provided path string is either null or has zero length and throws
 	 * a {@link IllegalArgumentException} if any of the two conditions apply.
-	 * 
+	 * In addition, leading and tailing whitespaces are removed.
+	 *
 	 * @param path
 	 *        the path string to be checked
+	 * @return The checked and trimmed path.
 	 */
-	private void checkPathArg(String path) {
+	private String checkAndTrimPathArg(String path) {
 		// disallow construction of a Path from an empty string
 		if (path == null) {
 			throw new IllegalArgumentException("Can not create a Path from a null string");
 		}
+		path = path.trim();
 		if (path.length() == 0) {
 			throw new IllegalArgumentException("Can not create a Path from an empty string");
 		}
+		return path;
 	}
 
 	/**
@@ -167,7 +173,7 @@ public class Path implements IOReadableWritable, Serializable {
 	 *        the string to construct a path from
 	 */
 	public Path(String pathString) {
-		checkPathArg(pathString);
+		pathString = checkAndTrimPathArg(pathString);
 
 		// We can't use 'new URI(String)' directly, since it assumes things are
 		// escaped, which we don't require of Paths.
@@ -217,7 +223,7 @@ public class Path implements IOReadableWritable, Serializable {
 	 *        the path string
 	 */
 	public Path(String scheme, String authority, String path) {
-		checkPathArg(path);
+		path = checkAndTrimPathArg(path);
 		initialize(scheme, authority, path);
 	}
 
@@ -247,9 +253,18 @@ public class Path implements IOReadableWritable, Serializable {
 	 * @return the normalized path string
 	 */
 	private String normalizePath(String path) {
-		// remove double slashes & backslashes
-		path = path.replace("//", "/");
+
+		// remove leading and tailing whitespaces
+		path = path.trim();
+
+		// remove consecutive slashes & backslashes
 		path = path.replace("\\", "/");
+		path = path.replaceAll("/+", "/");
+
+		// remove tailing separator
+		if(!path.equals(SEPARATOR) && path.endsWith(SEPARATOR)) {
+			path = path.substring(0, path.length() - SEPARATOR.length());
+		}
 
 		return path;
 	}
@@ -306,23 +321,19 @@ public class Path implements IOReadableWritable, Serializable {
 	}
 
 	/**
-	 * Returns the final component of this path.
+	 * Returns the final component of this path, i.e., everything that follows the last separator.
 	 * 
 	 * @return the final component of the path
 	 */
 	public String getName() {
 		final String path = uri.getPath();
-		if (path.endsWith(SEPARATOR)) {
-			final int slash = path.lastIndexOf(SEPARATOR, path.length() - SEPARATOR.length() - 1);
-			return path.substring(slash + 1, path.length() - SEPARATOR.length());
-		} else {
-			final int slash = path.lastIndexOf(SEPARATOR);
-			return path.substring(slash + 1);
-		}
+		final int slash = path.lastIndexOf(SEPARATOR);
+		return path.substring(slash + 1);
 	}
 
 	/**
-	 * Returns the parent of a path or <code>null</code> if at root.
+	 * Returns the parent of a path, i.e., everything that precedes the last separator
+	 * or <code>null</code> if at root.
 	 * 
 	 * @return the parent of a path or <code>null</code> if at root.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/8f321c72/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
index 43ded56..cc040b6 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
@@ -31,20 +31,19 @@ import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.types.IntValue;
 import org.junit.Test;
 
+import static org.junit.Assert.fail;
+
 public class FileOutputFormatTest {
 
 	@Test
-	public void testCreateNoneParallelLocalFS() {
-		
+	public void testCreateNonParallelLocalFS() throws IOException {
+
 		File tmpOutPath = null;
 		File tmpOutFile = null;
-		try {
-			tmpOutPath = File.createTempFile("fileOutputFormatTest", "Test1");
-			tmpOutFile = new File(tmpOutPath.getAbsolutePath()+"/1");
-		} catch (IOException e) {
-			throw new RuntimeException("Test in error", e);
-		}
-		
+
+		tmpOutPath = File.createTempFile("fileOutputFormatTest", "Test1");
+		tmpOutFile = new File(tmpOutPath.getAbsolutePath()+"/1");
+
 		String tmpFilePath = tmpOutPath.toURI().toString();
 
 		// check fail if file exists
@@ -54,18 +53,17 @@ public class FileOutputFormatTest {
 		dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
 
 		dfof.configure(new Configuration());
-		
-		boolean exception = false;
+
 		try {
 			dfof.open(0, 1);
 			dfof.close();
+			fail();
 		} catch (Exception e) {
-			exception = true;
+			// exception expected
 		}
-		Assert.assertTrue(exception);
+		tmpOutPath.delete();
 
 		// check fail if directory exists
-		tmpOutPath.delete();
 		Assert.assertTrue("Directory could not be created.", tmpOutPath.mkdir());
 
 		dfof = new DummyFileOutputFormat();
@@ -74,57 +72,72 @@ public class FileOutputFormatTest {
 		dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
 
 		dfof.configure(new Configuration());
-		
-		exception = false;
+
 		try {
 			dfof.open(0, 1);
 			dfof.close();
+			fail();
 		} catch (Exception e) {
-			exception = true;
+			// exception expected
 		}
-		Assert.assertTrue(exception);
-		
-		// check success
 		tmpOutPath.delete();
-		
+
+		// check success
 		dfof = new DummyFileOutputFormat();
 		dfof.setOutputFilePath(new Path(tmpFilePath));
 		dfof.setWriteMode(WriteMode.NO_OVERWRITE);
 		dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
 
 		dfof.configure(new Configuration());
-		
-		exception = false;
+
 		try {
 			dfof.open(0, 1);
 			dfof.close();
 		} catch (Exception e) {
-			exception = true;
+			fail();
 		}
-		Assert.assertTrue(!exception);
 		Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isFile());
-		
+		tmpOutPath.delete();
+
+		// check fail for path with tailing '/'
+		dfof = new DummyFileOutputFormat();
+		dfof.setOutputFilePath(new Path(tmpFilePath+"/"));
+		dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+		dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
+
+		dfof.configure(new Configuration());
+
+		try {
+			dfof.open(0, 1);
+			dfof.close();
+		} catch (Exception e) {
+			fail();
+		}
+		Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isFile());
+		tmpOutPath.delete();
+
 		// ----------- test again with always directory mode
-		
+
 		// check fail if file exists
+		tmpOutPath.createNewFile();
+
 		dfof = new DummyFileOutputFormat();
 		dfof.setOutputFilePath(new Path(tmpFilePath));
 		dfof.setWriteMode(WriteMode.NO_OVERWRITE);
 		dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
 
 		dfof.configure(new Configuration());
-		
-		exception = false;
+
 		try {
 			dfof.open(0, 1);
 			dfof.close();
+			fail();
 		} catch (Exception e) {
-			exception = true;
+			// exception expected
 		}
-		Assert.assertTrue(exception);
+		tmpOutPath.delete();
 
 		// check success if directory exists
-		tmpOutPath.delete();
 		Assert.assertTrue("Directory could not be created.", tmpOutPath.mkdir());
 
 		dfof = new DummyFileOutputFormat();
@@ -133,20 +146,18 @@ public class FileOutputFormatTest {
 		dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
 
 		dfof.configure(new Configuration());
-		
-		exception = false;
+
 		try {
 			dfof.open(0, 1);
 			dfof.close();
 		} catch (Exception e) {
-			exception = true;
+			fail();
 		}
-		Assert.assertTrue(!exception);
 		Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isDirectory());
 		Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
+		(new File(tmpOutPath.getAbsoluteFile()+"/1")).delete();
 
 		// check custom file name inside directory if directory exists
-		(new File(tmpOutPath.getAbsoluteFile()+"/1")).delete();
 		dfof = new DummyFileOutputFormat();
 		dfof.setOutputFilePath(new Path(tmpFilePath));
 		dfof.setWriteMode(WriteMode.NO_OVERWRITE);
@@ -155,15 +166,13 @@ public class FileOutputFormatTest {
 		Configuration c = new Configuration();
 		dfof.configure(c);
 
-		exception = false;
 		try {
 			dfof.open(0, 1);
 			dfof.close();
 		} catch (Exception e) {
-			exception = true;
+			fail();
 		}
 		File customOutFile = new File(tmpOutPath.getAbsolutePath()+"/fancy-1-0.avro");
-		Assert.assertTrue(!exception);
 		Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isDirectory());
 		Assert.assertTrue(customOutFile.exists() && customOutFile.isFile());
 		customOutFile.delete();
@@ -171,11 +180,8 @@ public class FileOutputFormatTest {
 		// check fail if file in directory exists
 		// create file for test
 		customOutFile = new File(tmpOutPath.getAbsolutePath()+"/1");
-		try {
-			customOutFile.createNewFile();
-		} catch (IOException e) {
-			Assert.fail("Error creating file");
-		}
+		customOutFile.createNewFile();
+
 		dfof = new DummyFileOutputFormat();
 		dfof.setOutputFilePath(new Path(tmpFilePath));
 		dfof.setWriteMode(WriteMode.NO_OVERWRITE);
@@ -183,20 +189,17 @@ public class FileOutputFormatTest {
 
 		dfof.configure(new Configuration());
 		
-		exception = false;
 		try {
 			dfof.open(0, 1);
 			dfof.close();
+			fail();
 		} catch (Exception e) {
-			exception = true;
+			// exception expected
 		}
-		Assert.assertTrue(exception);
-		
-		// check success if no file exists
-		// delete existing files
 		(new File(tmpOutPath.getAbsoluteFile()+"/1")).delete();
 		tmpOutPath.delete();
-		
+
+		// check success if no file exists
 		dfof = new DummyFileOutputFormat();
 		dfof.setOutputFilePath(new Path(tmpFilePath));
 		dfof.setWriteMode(WriteMode.NO_OVERWRITE);
@@ -204,35 +207,47 @@ public class FileOutputFormatTest {
 
 		dfof.configure(new Configuration());
 		
-		exception = false;
 		try {
 			dfof.open(0, 1);
 			dfof.close();
 		} catch (Exception e) {
-			exception = true;
+			fail();
 		}
-		Assert.assertTrue(!exception);
 		Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isDirectory());
 		Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
-				
-		// clean up
 		(new File(tmpOutPath.getAbsoluteFile()+"/1")).delete();
 		tmpOutPath.delete();
-		
+
+		// check success for path with tailing '/'
+		dfof = new DummyFileOutputFormat();
+		dfof.setOutputFilePath(new Path(tmpFilePath+'/'));
+		dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+		dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
+
+		dfof.configure(new Configuration());
+
+		try {
+			dfof.open(0, 1);
+			dfof.close();
+		} catch (Exception e) {
+			fail();
+		}
+		Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isDirectory());
+		Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
+		(new File(tmpOutPath.getAbsoluteFile()+"/1")).delete();
+		tmpOutPath.delete();
+
 	}
 	
 	@Test
-	public void testCreateParallelLocalFS() {
+	public void testCreateParallelLocalFS() throws IOException {
 		
 		File tmpOutPath = null;
 		File tmpOutFile = null;
-		try {
-			tmpOutPath = File.createTempFile("fileOutputFormatTest", "Test1");
-			tmpOutFile = new File(tmpOutPath.getAbsolutePath()+"/1");
-		} catch (IOException e) {
-			throw new RuntimeException("Test in error", e);
-		}
-		
+
+		tmpOutPath = File.createTempFile("fileOutputFormatTest", "Test1");
+		tmpOutFile = new File(tmpOutPath.getAbsolutePath()+"/1");
+
 		String tmpFilePath = tmpOutPath.toURI().toString();
 
 		// check fail if file exists
@@ -243,17 +258,16 @@ public class FileOutputFormatTest {
 
 		dfof.configure(new Configuration());
 		
-		boolean exception = false;
 		try {
 			dfof.open(0, 2);
 			dfof.close();
+			fail();
 		} catch (Exception e) {
-			exception = true;
+			// exception expected
 		}
-		Assert.assertTrue(exception);
+		tmpOutPath.delete();
 
 		// check success if directory exists
-		tmpOutPath.delete();
 		Assert.assertTrue("Directory could not be created.", tmpOutPath.mkdir());
 
 		dfof = new DummyFileOutputFormat();
@@ -263,18 +277,21 @@ public class FileOutputFormatTest {
 
 		dfof.configure(new Configuration());
 		
-		exception = false;
 		try {
 			dfof.open(0, 2);
 			dfof.close();
 		} catch (Exception e) {
-			exception = true;
+			fail();
 		}
-		Assert.assertTrue(!exception);
 		Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isDirectory());
 		Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
-		
+		tmpOutFile.delete();
+		tmpOutPath.delete();
+
 		// check fail if file in directory exists
+		tmpOutPath.mkdir();
+		tmpOutFile.createNewFile();
+
 		dfof = new DummyFileOutputFormat();
 		dfof.setOutputFilePath(new Path(tmpFilePath));
 		dfof.setWriteMode(WriteMode.NO_OVERWRITE);
@@ -282,20 +299,17 @@ public class FileOutputFormatTest {
 
 		dfof.configure(new Configuration());
 		
-		exception = false;
 		try {
 			dfof.open(0, 2);
 			dfof.close();
+			fail();
 		} catch (Exception e) {
-			exception = true;
+			// exception expected
 		}
-		Assert.assertTrue(exception);
-		
-		// check success if no file exists
-		// delete existing files
 		tmpOutFile.delete();
 		tmpOutPath.delete();
-		
+
+		// check success if no file exists
 		dfof = new DummyFileOutputFormat();
 		dfof.setOutputFilePath(new Path(tmpFilePath));
 		dfof.setWriteMode(WriteMode.NO_OVERWRITE);
@@ -303,34 +317,47 @@ public class FileOutputFormatTest {
 
 		dfof.configure(new Configuration());
 		
-		exception = false;
 		try {
 			dfof.open(0, 2);
 			dfof.close();
 		} catch (Exception e) {
-			exception = true;
+			fail();
 		}
-		Assert.assertTrue(!exception);
 		Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isDirectory());
 		Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
-		
-		// clean up
 		tmpOutFile.delete();
 		tmpOutPath.delete();
+
+		// check success for path with tailing '/'
+		dfof = new DummyFileOutputFormat();
+		dfof.setOutputFilePath(new Path(tmpFilePath+"/"));
+		dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+		dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
+
+		dfof.configure(new Configuration());
+
+		try {
+			dfof.open(0, 2);
+			dfof.close();
+		} catch (Exception e) {
+			fail();
+		}
+		Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isDirectory());
+		Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
+		tmpOutFile.delete();
+		tmpOutPath.delete();
+
 	}
 	
 	@Test
-	public void testOverwriteNoneParallelLocalFS() {
+	public void testOverwriteNonParallelLocalFS() throws IOException {
 		
 		File tmpOutPath = null;
 		File tmpOutFile = null;
-		try {
-			tmpOutPath = File.createTempFile("fileOutputFormatTest", "Test1");
-			tmpOutFile = new File(tmpOutPath.getAbsolutePath()+"/1");
-		} catch (IOException e) {
-			throw new RuntimeException("Test in error", e);
-		}
-		
+
+		tmpOutPath = File.createTempFile("fileOutputFormatTest", "Test1");
+		tmpOutFile = new File(tmpOutPath.getAbsolutePath()+"/1");
+
 		String tmpFilePath = tmpOutPath.toURI().toString();
 
 		// check success if file exists
@@ -341,14 +368,12 @@ public class FileOutputFormatTest {
 
 		dfof.configure(new Configuration());
 		
-		boolean exception = false;
 		try {
 			dfof.open(0, 1);
 			dfof.close();
 		} catch (Exception e) {
-			exception = true;
+			fail();
 		}
-		Assert.assertTrue(!exception);
 		Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isFile());
 
 		// check success if directory exists
@@ -362,19 +387,16 @@ public class FileOutputFormatTest {
 
 		dfof.configure(new Configuration());
 		
-		exception = false;
 		try {
 			dfof.open(0, 1);
 			dfof.close();
 		} catch (Exception e) {
-			exception = true;
+			fail();
 		}
-		Assert.assertTrue(!exception);
 		Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isFile());
-		
-		// check success
 		tmpOutPath.delete();
-		
+
+		// check success
 		dfof = new DummyFileOutputFormat();
 		dfof.setOutputFilePath(new Path(tmpFilePath));
 		dfof.setWriteMode(WriteMode.OVERWRITE);
@@ -382,19 +404,37 @@ public class FileOutputFormatTest {
 
 		dfof.configure(new Configuration());
 		
-		exception = false;
 		try {
 			dfof.open(0, 1);
 			dfof.close();
 		} catch (Exception e) {
-			exception = true;
+			fail();
 		}
-		Assert.assertTrue(!exception);
 		Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isFile());
-		
+		tmpOutPath.delete();
+
+		// check fail for path with tailing '/'
+		dfof = new DummyFileOutputFormat();
+		dfof.setOutputFilePath(new Path(tmpFilePath+"/"));
+		dfof.setWriteMode(WriteMode.OVERWRITE);
+		dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
+
+		dfof.configure(new Configuration());
+
+		try {
+			dfof.open(0, 1);
+			dfof.close();
+		} catch (Exception e) {
+			fail();
+		}
+		Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isFile());
+		tmpOutPath.delete();
+
 		// ----------- test again with always directory mode
 		
 		// check success if file exists
+		tmpOutPath.createNewFile();
+
 		dfof = new DummyFileOutputFormat();
 		dfof.setOutputFilePath(new Path(tmpFilePath));
 		dfof.setWriteMode(WriteMode.OVERWRITE);
@@ -402,20 +442,19 @@ public class FileOutputFormatTest {
 
 		dfof.configure(new Configuration());
 		
-		exception = false;
 		try {
 			dfof.open(0, 1);
 			dfof.close();
 		} catch (Exception e) {
-			exception = true;
+			fail();
 		}
-		Assert.assertTrue(!exception);
 		Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isDirectory());
 		Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
 
-		// check success if directory exists
 		tmpOutFile.delete();
 		tmpOutPath.delete();
+
+		// check success if directory exists
 		Assert.assertTrue("Directory could not be created.", tmpOutPath.mkdir());
 
 		dfof = new DummyFileOutputFormat();
@@ -425,18 +464,21 @@ public class FileOutputFormatTest {
 
 		dfof.configure(new Configuration());
 		
-		exception = false;
 		try {
 			dfof.open(0, 1);
 			dfof.close();
 		} catch (Exception e) {
-			exception = true;
+			fail();
 		}
-		Assert.assertTrue(!exception);
 		Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isDirectory());
 		Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
-		
+		tmpOutPath.delete();
+		tmpOutFile.delete();
+
 		// check success if file in directory exists
+		tmpOutPath.mkdir();
+		tmpOutFile.createNewFile();
+
 		dfof = new DummyFileOutputFormat();
 		dfof.setOutputFilePath(new Path(tmpFilePath));
 		dfof.setWriteMode(WriteMode.OVERWRITE);
@@ -444,22 +486,18 @@ public class FileOutputFormatTest {
 
 		dfof.configure(new Configuration());
 		
-		exception = false;
 		try {
 			dfof.open(0, 1);
 			dfof.close();
 		} catch (Exception e) {
-			exception = true;
+			fail();
 		}
-		Assert.assertTrue(!exception);
 		Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isDirectory());
 		Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
-		
-		// check success if no file exists
-		// delete existing files
-		tmpOutFile.delete();
 		tmpOutPath.delete();
-		
+		tmpOutFile.delete();
+
+		// check success if no file exists
 		dfof = new DummyFileOutputFormat();
 		dfof.setOutputFilePath(new Path(tmpFilePath));
 		dfof.setWriteMode(WriteMode.OVERWRITE);
@@ -467,35 +505,46 @@ public class FileOutputFormatTest {
 
 		dfof.configure(new Configuration());
 		
-		exception = false;
 		try {
 			dfof.open(0, 1);
 			dfof.close();
 		} catch (Exception e) {
-			exception = true;
+			fail();
+		}
+		Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isDirectory());
+		Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
+		tmpOutFile.delete();
+		tmpOutPath.delete();
+
+		// check success for path with tailing '/'
+		dfof = new DummyFileOutputFormat();
+		dfof.setOutputFilePath(new Path(tmpFilePath+"/"));
+		dfof.setWriteMode(WriteMode.OVERWRITE);
+		dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
+
+		dfof.configure(new Configuration());
+
+		try {
+			dfof.open(0, 1);
+			dfof.close();
+		} catch (Exception e) {
+			fail();
 		}
-		Assert.assertTrue(!exception);
 		Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isDirectory());
 		Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
-		
-		// clean up
 		tmpOutFile.delete();
 		tmpOutPath.delete();
-		
 	}
 	
 	@Test
-	public void testOverwriteParallelLocalFS() {
+	public void testOverwriteParallelLocalFS() throws IOException {
 		
 		File tmpOutPath = null;
 		File tmpOutFile = null;
-		try {
-			tmpOutPath = File.createTempFile("fileOutputFormatTest", "Test1");
-			tmpOutFile = new File(tmpOutPath.getAbsolutePath()+"/1");
-		} catch (IOException e) {
-			throw new RuntimeException("Test in error", e);
-		}
-		
+
+		tmpOutPath = File.createTempFile("fileOutputFormatTest", "Test1");
+		tmpOutFile = new File(tmpOutPath.getAbsolutePath()+"/1");
+
 		String tmpFilePath = tmpOutPath.toURI().toString();
 
 		// check success if file exists
@@ -506,20 +555,18 @@ public class FileOutputFormatTest {
 
 		dfof.configure(new Configuration());
 		
-		boolean exception = false;
 		try {
 			dfof.open(0, 2);
 			dfof.close();
 		} catch (Exception e) {
-			exception = true;
+			fail();
 		}
-		Assert.assertTrue(!exception);
 		Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isDirectory());
 		Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
-
-		// check success if directory exists
 		tmpOutFile.delete();
 		tmpOutPath.delete();
+
+		// check success if directory exists
 		Assert.assertTrue("Directory could not be created.", tmpOutPath.mkdir());
 
 		dfof = new DummyFileOutputFormat();
@@ -529,18 +576,21 @@ public class FileOutputFormatTest {
 
 		dfof.configure(new Configuration());
 		
-		exception = false;
 		try {
 			dfof.open(0, 2);
 			dfof.close();
 		} catch (Exception e) {
-			exception = true;
+			fail();
 		}
-		Assert.assertTrue(!exception);
 		Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isDirectory());
 		Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
-		
+		tmpOutFile.delete();
+		tmpOutPath.delete();
+
 		// check success if file in directory exists
+		tmpOutPath.mkdir();
+		tmpOutFile.createNewFile();
+
 		dfof = new DummyFileOutputFormat();
 		dfof.setOutputFilePath(new Path(tmpFilePath));
 		dfof.setWriteMode(WriteMode.OVERWRITE);
@@ -548,22 +598,18 @@ public class FileOutputFormatTest {
 
 		dfof.configure(new Configuration());
 		
-		exception = false;
 		try {
 			dfof.open(0, 2);
 			dfof.close();
 		} catch (Exception e) {
-			exception = true;
+			fail();
 		}
-		Assert.assertTrue(!exception);
 		Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isDirectory());
 		Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
-		
-		// check success if no file exists
-		// delete existing files
 		(new File(tmpOutPath.getAbsoluteFile()+"/1")).delete();
 		tmpOutPath.delete();
-		
+
+		// check success if no file exists
 		dfof = new DummyFileOutputFormat();
 		dfof.setOutputFilePath(new Path(tmpFilePath));
 		dfof.setWriteMode(WriteMode.OVERWRITE);
@@ -571,21 +617,36 @@ public class FileOutputFormatTest {
 
 		dfof.configure(new Configuration());
 		
-		exception = false;
 		try {
 			dfof.open(0, 2);
 			dfof.close();
 		} catch (Exception e) {
-			exception = true;
+			fail();
 		}
-		Assert.assertTrue(!exception);
 		Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isDirectory());
 		Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
-		
-		// clean up
 		tmpOutFile.delete();
 		tmpOutPath.delete();
-		
+
+		// check success for path with tailing '/'
+		dfof = new DummyFileOutputFormat();
+		dfof.setOutputFilePath(new Path(tmpFilePath+"/"));
+		dfof.setWriteMode(WriteMode.OVERWRITE);
+		dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
+
+		dfof.configure(new Configuration());
+
+		try {
+			dfof.open(0, 2);
+			dfof.close();
+		} catch (Exception e) {
+			fail();
+		}
+		Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isDirectory());
+		Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
+		tmpOutFile.delete();
+		tmpOutPath.delete();
+
 	}
 	
 	// -------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8f321c72/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java
index 7f04fa6..1912cb3 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java
@@ -23,6 +23,148 @@ import org.junit.Test;
 import static org.junit.Assert.*;
 
 public class PathTest {
+
+	@Test
+	public void testPathFromString() {
+
+		Path p = new Path("/my/path");
+		assertEquals("/my/path", p.toUri().getPath());
+		assertNull(p.toUri().getScheme());
+
+		p = new Path("/my/path/");
+		assertEquals("/my/path", p.toUri().getPath());
+		assertNull(p.toUri().getScheme());
+
+		p = new Path("/my//path/");
+		assertEquals("/my/path", p.toUri().getPath());
+		assertNull(p.toUri().getScheme());
+
+		p = new Path("/my//path//a///");
+		assertEquals("/my/path/a", p.toUri().getPath());
+		assertNull(p.toUri().getScheme());
+
+		p = new Path("\\my\\path\\\\a\\\\\\");
+		assertEquals("/my/path/a", p.toUri().getPath());
+		assertNull(p.toUri().getScheme());
+
+		p = new Path("/my/path/ ");
+		assertEquals("/my/path", p.toUri().getPath());
+		assertNull(p.toUri().getScheme());
+
+		p = new Path("hdfs:///my/path");
+		assertEquals("/my/path", p.toUri().getPath());
+		assertEquals("hdfs", p.toUri().getScheme());
+
+		p = new Path("hdfs:///my/path/");
+		assertEquals("/my/path", p.toUri().getPath());
+		assertEquals("hdfs", p.toUri().getScheme());
+
+		p = new Path("file:///my/path");
+		assertEquals("/my/path", p.toUri().getPath());
+		assertEquals("file", p.toUri().getScheme());
+
+		try {
+			new Path((String)null);
+			fail();
+		} catch(Exception e) {
+			// exception expected
+		}
+
+		try {
+			new Path("");
+			fail();
+		} catch(Exception e) {
+			// exception expected
+		}
+
+		try {
+			new Path(" ");
+			fail();
+		} catch(Exception e) {
+			// exception expected
+		}
+
+	}
+
+	@Test
+	public void testIsAbsolute() {
+
+		Path p = new Path("/my/abs/path");
+		assertTrue(p.isAbsolute());
+
+		p = new Path("./my/rel/path");
+		assertFalse(p.isAbsolute());
+
+	}
+
+	@Test
+	public void testGetName() {
+
+		Path p = new Path("/my/fancy/path");
+		assertEquals("path", p.getName());
+
+		p = new Path("/my/fancy/path/");
+		assertEquals("path", p.getName());
+
+		p = new Path("hdfs:///my/path");
+		assertEquals("path", p.getName());
+
+		p = new Path("hdfs:///myPath/");
+		assertEquals("myPath", p.getName());
+
+		p = new Path("/");
+		assertEquals("", p.getName());
+
+	}
+
+	@Test
+	public void testGetParent() {
+
+		Path p = new Path("/my/fancy/path");
+		assertEquals("/my/fancy", p.getParent().toUri().getPath());
+
+		p = new Path("/my/other/fancy/path/");
+		assertEquals("/my/other/fancy", p.getParent().toUri().getPath());
+
+		p = new Path("hdfs:///my/path");
+		assertEquals("/my", p.getParent().toUri().getPath());
+
+		p = new Path("hdfs:///myPath/");
+		assertEquals("/", p.getParent().toUri().getPath());
+
+		p = new Path("/");
+		assertNull(p.getParent());
+	}
+
+	@Test
+	public void testSuffix() {
+
+		Path p = new Path("/my/path");
+		p = p.suffix("_123");
+		assertEquals("/my/path_123", p.toUri().getPath());
+
+		p = new Path("/my/path/");
+		p = p.suffix("/abc");
+		assertEquals("/my/path/abc", p.toUri().getPath());
+
+	}
+
+	@Test
+	public void testDepth() {
+
+		Path p = new Path("/my/path");
+		assertEquals(2, p.depth());
+
+		p = new Path("/my/fancy/path/");
+		assertEquals(3, p.depth());
+
+		p = new Path("/my/fancy/fancy/fancy/fancy/fancy/fancy/fancy/fancy/fancy/fancy/path");
+		assertEquals(12, p.depth());
+
+		p = new Path("/");
+		assertEquals(0, p.depth());
+	}
+
 	@Test
 	public void testParsing() {
 		URI u;