You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/04/08 09:53:16 UTC

[3/3] flink git commit: [FLINK-1840] Fix BLOB manager on Windows

[FLINK-1840] Fix BLOB manager on Windows

This closes #578


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0afed4dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0afed4dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0afed4dc

Branch: refs/heads/master
Commit: 0afed4dcf9f9639fc90a8f18933be0516b9d7cd3
Parents: cc7eda1
Author: Fabian Hueske <fh...@apache.org>
Authored: Tue Apr 7 23:04:59 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Apr 8 09:52:15 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/blob/BlobServerConnection.java       | 13 ++++---------
 1 file changed, 4 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0afed4dc/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
index 50b1f24..4f242a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
@@ -30,6 +30,7 @@ import java.net.SocketException;
 import java.security.MessageDigest;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.shaded.com.google.common.io.Files;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -307,24 +308,18 @@ class BlobServerConnection extends Thread {
 					md.update(buf, 0, bytesExpected);
 				}
 			}
-
+			fos.close();
 
 			if (contentAddressable == NAME_ADDRESSABLE) {
 				File storageFile = this.blobServer.getStorageLocation(jobID, key);
-				if (!incomingFile.renameTo(storageFile)) {
-					throw new IOException(String.format("Cannot move staging file %s to BLOB file %s",
-							incomingFile.getAbsolutePath(), storageFile.getAbsolutePath()));
-				}
+				Files.move(incomingFile, storageFile);
 				incomingFile = null;
 				outputStream.write(RETURN_OKAY);
 			}
 			else {
 				BlobKey blobKey = new BlobKey(md.digest());
 				File storageFile = blobServer.getStorageLocation(blobKey);
-				if (!incomingFile.renameTo(storageFile)) {
-					throw new IOException(String.format("Cannot move staging file %s to BLOB file %s",
-							incomingFile.getAbsolutePath(), storageFile.getAbsolutePath()));
-				}
+				Files.move(incomingFile, storageFile);
 				incomingFile = null;
 
 				// Return computed key to client for validation