You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/06/08 07:16:00 UTC
[3/4] flink git commit: [FLINK-6820] Activate checkstyle for
runtime/filecache
[FLINK-6820] Activate checkstyle for runtime/filecache
This closes #4062.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ae4f2b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ae4f2b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ae4f2b0
Branch: refs/heads/master
Commit: 8ae4f2b0ae103540435244ff75ab4b593670dd76
Parents: 92cd736
Author: zentol <ch...@apache.org>
Authored: Fri Jun 2 21:06:16 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Jun 7 23:06:08 2017 +0200
----------------------------------------------------------------------
flink-runtime/pom.xml | 1 -
.../flink/runtime/filecache/FileCache.java | 36 +++++++++---------
.../FileCacheDeleteValidationTest.java | 39 ++++++++++----------
3 files changed, 36 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8ae4f2b0/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 3cee8d8..602f788 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -430,7 +430,6 @@ under the License.
**/runtime/deployment/**,
**/runtime/execution/**,
**/runtime/executiongraph/**,
- **/runtime/filecache/**,
**/runtime/fs/**,
**/runtime/heartbeat/**,
**/runtime/highavailability/**,
http://git-wip-us.apache.org/repos/asf/flink/blob/8ae4f2b0/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index 4f2166f..84b8feb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.filecache;
-import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
import org.apache.flink.api.java.tuple.Tuple4;
@@ -31,6 +30,7 @@ import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
+import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,8 +54,8 @@ import java.util.concurrent.TimeUnit;
public class FileCache {
static final Logger LOG = LoggerFactory.getLogger(FileCache.class);
-
- /** cache-wide lock to ensure consistency. copies are not done under this lock */
+
+ /** cache-wide lock to ensure consistency. copies are not done under this lock. */
private final Object lock = new Object();
private final Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries;
@@ -99,12 +99,12 @@ public class FileCache {
this.shutdownHook = createShutdownHook(this, LOG);
this.entries = new HashMap<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>>();
- this.executorService = Executors.newScheduledThreadPool(10,
+ this.executorService = Executors.newScheduledThreadPool(10,
new ExecutorThreadFactory("flink-file-cache"));
}
/**
- * Shuts down the file cache by cancelling all
+ * Shuts down the file cache by cancelling all.
*/
public void shutdown() {
synchronized (lock) {
@@ -119,9 +119,9 @@ public class FileCache {
// may happen
}
}
-
+
entries.clear();
-
+
// clean up the all storage directories
for (File dir : storageDirectories) {
try {
@@ -172,7 +172,7 @@ public class FileCache {
// file is already in the cache. return a future that
// immediately returns the file
fileEntry.f0 = fileEntry.f0 + 1;
-
+
// return the future. may be that the copy is still in progress
return fileEntry.f3;
}
@@ -197,10 +197,10 @@ public class FileCache {
CopyProcess cp = new CopyProcess(entry, target);
FutureTask<Path> copyTask = new FutureTask<Path>(cp);
executorService.submit(copyTask);
-
+
// store our entry
jobEntries.put(name, new Tuple4<Integer, File, Path, Future<Path>>(1, tempDirToUse, target, copyTask));
-
+
return copyTask;
}
}
@@ -216,8 +216,7 @@ public class FileCache {
DeleteProcess dp = new DeleteProcess(lock, entries, name, jobID);
executorService.schedule(dp, 5000L, TimeUnit.MILLISECONDS);
}
-
-
+
boolean holdsStillReference(String name, JobID jobId) {
Map<String, Tuple4<Integer, File, Path, Future<Path>>> jobEntries = entries.get(jobId);
if (jobEntries != null) {
@@ -298,7 +297,7 @@ public class FileCache {
// ------------------------------------------------------------------------
/**
- * Asynchronous file copy process
+ * Asynchronous file copy process.
*/
private static class CopyProcess implements Callable<Path> {
@@ -333,8 +332,7 @@ public class FileCache {
private final JobID jobID;
public DeleteProcess(Object lock, Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries,
- String name, JobID jobID)
- {
+ String name, JobID jobID) {
this.lock = lock;
this.entries = entries;
this.name = name;
@@ -346,10 +344,10 @@ public class FileCache {
try {
synchronized (lock) {
Map<String, Tuple4<Integer, File, Path, Future<Path>>> jobEntries = entries.get(jobID);
-
+
if (jobEntries != null) {
Tuple4<Integer, File, Path, Future<Path>> entry = jobEntries.get(name);
-
+
if (entry != null) {
int count = entry.f0;
if (count > 1) {
@@ -362,7 +360,7 @@ public class FileCache {
if (jobEntries.isEmpty()) {
entries.remove(jobID);
}
-
+
// abort the copy
entry.f3.cancel(true);
@@ -376,7 +374,7 @@ public class FileCache {
LOG.error("Could not delete locally cached file " + file.getAbsolutePath());
}
}
-
+
// remove the job wide temp directory, if it is now empty
File parent = entry.f1;
if (parent.isDirectory()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8ae4f2b0/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
index 4dca3db..89ab975 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
@@ -18,26 +18,25 @@
package org.apache.flink.runtime.filecache;
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.Future;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
+import org.apache.flink.core.fs.Path;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
import org.junit.rules.TemporaryFolder;
-import static org.junit.Assert.fail;
-import static org.junit.Assert.assertTrue;
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Future;
+
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* Test delete process of {@link FileCache}. The local cache file should not be deleted why another task comes in 5 seconds.
@@ -64,7 +63,7 @@ public class FileCacheDeleteValidationTest {
private FileCache fileCache;
private File f;
-
+
@Before
public void setup() throws IOException {
String[] tmpDirectories = new String[]{temporaryFolder.newFolder().getAbsolutePath()};
@@ -75,7 +74,7 @@ public class FileCacheDeleteValidationTest {
e.printStackTrace();
fail("Cannot create FileCache: " + e.getMessage());
}
-
+
f = temporaryFolder.newFile("cacheFile");
try {
Files.write(testFileContent, f, Charsets.UTF_8);
@@ -102,19 +101,19 @@ public class FileCacheDeleteValidationTest {
try {
final JobID jobID = new JobID();
final String fileName = "test_file";
-
+
final String filePath = f.toURI().toString();
-
+
// copy / create the file
Future<Path> copyResult = fileCache.createTmpFile(fileName, new DistributedCacheEntry(filePath, false), jobID);
copyResult.get();
-
+
// get another reference to the file
Future<Path> copyResult2 = fileCache.createTmpFile(fileName, new DistributedCacheEntry(filePath, false), jobID);
-
+
// this should be available immediately
assertTrue(copyResult2.isDone());
-
+
// delete the file
fileCache.deleteTmpFile(fileName, jobID);
// file should not yet be deleted
@@ -124,10 +123,10 @@ public class FileCacheDeleteValidationTest {
fileCache.deleteTmpFile(fileName, jobID);
// file should still not be deleted, but remain for a bit
assertTrue(fileCache.holdsStillReference(fileName, jobID));
-
+
fileCache.createTmpFile(fileName, new DistributedCacheEntry(filePath, false), jobID);
fileCache.deleteTmpFile(fileName, jobID);
-
+
// after a while, the file should disappear
long deadline = System.currentTimeMillis() + 20000;
do {