You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/06/08 07:16:00 UTC

[3/4] flink git commit: [FLINK-6820] Activate checkstyle for runtime/filecache

[FLINK-6820] Activate checkstyle for runtime/filecache

This closes #4062.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ae4f2b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ae4f2b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ae4f2b0

Branch: refs/heads/master
Commit: 8ae4f2b0ae103540435244ff75ab4b593670dd76
Parents: 92cd736
Author: zentol <ch...@apache.org>
Authored: Fri Jun 2 21:06:16 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Jun 7 23:06:08 2017 +0200

----------------------------------------------------------------------
 flink-runtime/pom.xml                           |  1 -
 .../flink/runtime/filecache/FileCache.java      | 36 +++++++++---------
 .../FileCacheDeleteValidationTest.java          | 39 ++++++++++----------
 3 files changed, 36 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8ae4f2b0/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 3cee8d8..602f788 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -430,7 +430,6 @@ under the License.
 						**/runtime/deployment/**,
 						**/runtime/execution/**,
 						**/runtime/executiongraph/**,
-						**/runtime/filecache/**,
 						**/runtime/fs/**,
 						**/runtime/heartbeat/**,
 						**/runtime/highavailability/**,

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae4f2b0/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index 4f2166f..84b8feb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.filecache;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.api.java.tuple.Tuple4;
@@ -31,6 +30,7 @@ import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,8 +54,8 @@ import java.util.concurrent.TimeUnit;
 public class FileCache {
 
 	static final Logger LOG = LoggerFactory.getLogger(FileCache.class);
-	
-	/** cache-wide lock to ensure consistency. copies are not done under this lock */
+
+	/** cache-wide lock to ensure consistency. copies are not done under this lock. */
 	private final Object lock = new Object();
 
 	private final Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries;
@@ -99,12 +99,12 @@ public class FileCache {
 		this.shutdownHook = createShutdownHook(this, LOG);
 
 		this.entries = new HashMap<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>>();
-		this.executorService = Executors.newScheduledThreadPool(10, 
+		this.executorService = Executors.newScheduledThreadPool(10,
 				new ExecutorThreadFactory("flink-file-cache"));
 	}
 
 	/**
-	 * Shuts down the file cache by cancelling all
+	 * Shuts down the file cache by cancelling all.
 	 */
 	public void shutdown() {
 		synchronized (lock) {
@@ -119,9 +119,9 @@ public class FileCache {
 					// may happen
 				}
 			}
-			
+
 			entries.clear();
-			
+
 			// clean up the all storage directories
 			for (File dir : storageDirectories) {
 				try {
@@ -172,7 +172,7 @@ public class FileCache {
 				// file is already in the cache. return a future that
 				// immediately returns the file
 				fileEntry.f0 = fileEntry.f0 + 1;
-				
+
 				// return the future. may be that the copy is still in progress
 				return fileEntry.f3;
 			}
@@ -197,10 +197,10 @@ public class FileCache {
 				CopyProcess cp = new CopyProcess(entry, target);
 				FutureTask<Path> copyTask = new FutureTask<Path>(cp);
 				executorService.submit(copyTask);
-				
+
 				// store our entry
 				jobEntries.put(name, new Tuple4<Integer, File, Path, Future<Path>>(1, tempDirToUse, target, copyTask));
-				
+
 				return copyTask;
 			}
 		}
@@ -216,8 +216,7 @@ public class FileCache {
 		DeleteProcess dp = new DeleteProcess(lock, entries, name, jobID);
 		executorService.schedule(dp, 5000L, TimeUnit.MILLISECONDS);
 	}
-	
-	
+
 	boolean holdsStillReference(String name, JobID jobId) {
 		Map<String, Tuple4<Integer, File, Path, Future<Path>>> jobEntries = entries.get(jobId);
 		if (jobEntries != null) {
@@ -298,7 +297,7 @@ public class FileCache {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Asynchronous file copy process
+	 * Asynchronous file copy process.
 	 */
 	private static class CopyProcess implements Callable<Path> {
 
@@ -333,8 +332,7 @@ public class FileCache {
 		private final JobID jobID;
 
 		public DeleteProcess(Object lock, Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries,
-								String name, JobID jobID)
-		{
+								String name, JobID jobID) {
 			this.lock = lock;
 			this.entries = entries;
 			this.name = name;
@@ -346,10 +344,10 @@ public class FileCache {
 			try {
 				synchronized (lock) {
 					Map<String, Tuple4<Integer, File, Path, Future<Path>>> jobEntries = entries.get(jobID);
-					
+
 					if (jobEntries != null) {
 						Tuple4<Integer, File, Path, Future<Path>> entry = jobEntries.get(name);
-						
+
 						if (entry != null) {
 							int count = entry.f0;
 							if (count > 1) {
@@ -362,7 +360,7 @@ public class FileCache {
 								if (jobEntries.isEmpty()) {
 									entries.remove(jobID);
 								}
-								
+
 								// abort the copy
 								entry.f3.cancel(true);
 
@@ -376,7 +374,7 @@ public class FileCache {
 										LOG.error("Could not delete locally cached file " + file.getAbsolutePath());
 									}
 								}
-								
+
 								// remove the job wide temp directory, if it is now empty
 								File parent = entry.f1;
 								if (parent.isDirectory()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae4f2b0/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
index 4dca3db..89ab975 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
@@ -18,26 +18,25 @@
 
 package org.apache.flink.runtime.filecache;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.Future;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
+import org.apache.flink.core.fs.Path;
 
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
 import org.junit.rules.TemporaryFolder;
 
-import static org.junit.Assert.fail;
-import static org.junit.Assert.assertTrue;
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Future;
+
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Test delete process of {@link FileCache}. The local cache file should not be deleted why another task comes in 5 seconds.
@@ -64,7 +63,7 @@ public class FileCacheDeleteValidationTest {
 
 	private FileCache fileCache;
 	private File f;
-	
+
 	@Before
 	public void setup() throws IOException {
 		String[] tmpDirectories = new String[]{temporaryFolder.newFolder().getAbsolutePath()};
@@ -75,7 +74,7 @@ public class FileCacheDeleteValidationTest {
 			e.printStackTrace();
 			fail("Cannot create FileCache: " + e.getMessage());
 		}
-		
+
 		f = temporaryFolder.newFile("cacheFile");
 		try {
 			Files.write(testFileContent, f, Charsets.UTF_8);
@@ -102,19 +101,19 @@ public class FileCacheDeleteValidationTest {
 		try {
 			final JobID jobID = new JobID();
 			final String fileName = "test_file";
-			
+
 			final String filePath = f.toURI().toString();
-			
+
 			// copy / create the file
 			Future<Path> copyResult = fileCache.createTmpFile(fileName, new DistributedCacheEntry(filePath, false), jobID);
 			copyResult.get();
-			
+
 			// get another reference to the file
 			Future<Path> copyResult2 = fileCache.createTmpFile(fileName, new DistributedCacheEntry(filePath, false), jobID);
-			
+
 			// this should be available immediately
 			assertTrue(copyResult2.isDone());
-			
+
 			// delete the file
 			fileCache.deleteTmpFile(fileName, jobID);
 			// file should not yet be deleted
@@ -124,10 +123,10 @@ public class FileCacheDeleteValidationTest {
 			fileCache.deleteTmpFile(fileName, jobID);
 			// file should still not be deleted, but remain for a bit
 			assertTrue(fileCache.holdsStillReference(fileName, jobID));
-			
+
 			fileCache.createTmpFile(fileName, new DistributedCacheEntry(filePath, false), jobID);
 			fileCache.deleteTmpFile(fileName, jobID);
-			
+
 			// after a while, the file should disappear
 			long deadline = System.currentTimeMillis() + 20000;
 			do {