You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/30 14:14:57 UTC

[3/4] flink git commit: [runtime] Improve error messages and add tests for failed library downloads on TaskManagers

[runtime] Improve error messages and add tests for failed library downloads on TaskManagers


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2667f0e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2667f0e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2667f0e0

Branch: refs/heads/master
Commit: 2667f0e0c2dab541599269bba21e6336955d942c
Parents: 01adab5
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Mar 29 19:27:17 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 30 12:57:16 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/blob/BlobCache.java    |   3 +-
 .../librarycache/BlobLibraryCacheManager.java   |  18 +--
 .../BlobLibraryCacheManagerTest.java            | 123 +++++++++++++++++--
 3 files changed, 125 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2667f0e0/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 33bb7f0..fd768a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -162,7 +162,8 @@ public final class BlobCache implements BlobService {
 					}
 				}
 				catch (IOException e) {
-					String message = "Failed to fetch BLOB " + requiredBlob + "  from " + serverAddress + '.';
+					String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress +
+							" and store it under " + localJarFile.getAbsolutePath();
 					if (attempt < numFetchRetries) {
 						attempt++;
 						if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2667f0e0/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index 680f968..d394e2d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -47,8 +47,6 @@ import com.google.common.base.Preconditions;
  * a set of libraries (typically JAR files) which the job requires to run. The library cache manager
  * caches library files in order to avoid unnecessary retransmission of data. It is based on a singleton
  * programming pattern, so there exists at most on library manager at a time.
- * <p>
- * This class is thread-safe.
  */
 public final class BlobLibraryCacheManager extends TimerTask implements LibraryCacheManager {
 
@@ -236,20 +234,20 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
 			URL url = blobService.getURL(key);
 
 			Integer references = blobKeyReferenceCounters.get(key);
-			int newReferences = references == null ? 1 : references.intValue() + 1;
+			int newReferences = references == null ? 1 : references + 1;
 			blobKeyReferenceCounters.put(key, newReferences);
 
 			return url;
 		}
 		catch (IOException e) {
-			throw new IOException("Cannot access jar file stored under " + key, e);
+			throw new IOException("Cannot get library with hash " + key, e);
 		}
 	}
 	
 	private void unregisterReferenceToBlobKey(BlobKey key) {
 		Integer references = blobKeyReferenceCounters.get(key);
 		if (references != null) {
-			int newReferences = Math.max(references.intValue() - 1, 0);
+			int newReferences = Math.max(references - 1, 0);
 			blobKeyReferenceCounters.put(key, newReferences);
 		}
 		else {
@@ -261,7 +259,12 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
 
 
 	// --------------------------------------------------------------------------------------------
-	
+
+	/**
+	 * An entry in the per-job library cache. Tracks which execution attempts
+	 * still reference the libraries. Once none reference it any more, the
+	 * libraries can be cleaned up.
+	 */
 	private static class LibraryCacheEntry {
 		
 		private final ClassLoader classLoader;
@@ -289,7 +292,8 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
 		
 		public void register(ExecutionAttemptID task, Collection<BlobKey> keys) {
 			if (!libraries.containsAll(keys)) {
-				throw new IllegalStateException("The library registration references a different set of libraries than previous registrations for this job.");
+				throw new IllegalStateException(
+						"The library registration references a different set of libraries than previous registrations for this job.");
 			}
 			
 			this.referenceHolders.add(task);

http://git-wip-us.apache.org/repos/asf/flink/blob/2667f0e0/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 4c24b1e..239e356 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -21,23 +21,28 @@ package org.apache.flink.runtime.execution.librarycache;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.junit.Test;
+
 import static org.junit.Assert.*;
 
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 
 public class BlobLibraryCacheManagerTest {
 
 	@Test
-	public void testLibraryCacheManagerCleanup(){
+	public void testLibraryCacheManagerCleanup() {
 		Configuration config = new Configuration();
 
 		config.setLong(ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, 1);
@@ -65,7 +70,7 @@ public class BlobLibraryCacheManagerTest {
 
 			List<File> files = new ArrayList<File>();
 
-			for (BlobKey key: keys){
+			for (BlobKey key : keys) {
 				files.add(libraryCacheManager.getFile(key));
 			}
 
@@ -81,10 +86,10 @@ public class BlobLibraryCacheManagerTest {
 				do {
 					Thread.sleep(500);
 				}
-				while (libraryCacheManager.getNumberOfCachedLibraries() > 0 && 
+				while (libraryCacheManager.getNumberOfCachedLibraries() > 0 &&
 						System.currentTimeMillis() < deadline);
 			}
-			
+
 			// this fails if we exited via a timeout
 			assertEquals(0, libraryCacheManager.getNumberOfCachedLibraries());
 
@@ -94,31 +99,127 @@ public class BlobLibraryCacheManagerTest {
 				// the blob cache should no longer contain the files
 				try {
 					files.add(libraryCacheManager.getFile(key));
-				} catch (IOException ioe) {
+				}
+				catch (IOException ioe) {
 					caughtExceptions++;
 				}
 			}
 
 			assertEquals(2, caughtExceptions);
-			
+
 			bc.close();
 		}
-		catch (Exception e){
+		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally{
-			if (server != null){
+		finally {
+			if (server != null) {
 				server.shutdown();
 			}
 
-			if (libraryCacheManager != null){
+			if (libraryCacheManager != null) {
 				try {
 					libraryCacheManager.shutdown();
-				} catch (IOException e) {
+				}
+				catch (IOException e) {
 					e.printStackTrace();
 				}
 			}
 		}
 	}
+
+	@Test
+	public void testRegisterAndDownload() {
+		BlobServer server = null;
+		BlobCache cache = null;
+		File cacheDir = null;
+		try {
+			// create the blob transfer services
+			Configuration config = new Configuration();
+			server = new BlobServer(config);
+			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
+			cache = new BlobCache(serverAddress, config);
+
+			// upload some meaningless data to the server
+			BlobClient uploader = new BlobClient(serverAddress);
+			BlobKey dataKey1 = uploader.put(new byte[]{1, 2, 3, 4, 5, 6, 7, 8});
+			BlobKey dataKey2 = uploader.put(new byte[]{11, 12, 13, 14, 15, 16, 17, 18});
+			uploader.close();
+
+			BlobLibraryCacheManager libCache = new BlobLibraryCacheManager(cache, 1000000000L);
+
+			assertEquals(0, libCache.getNumberOfCachedLibraries());
+
+			// first try to access a non-existing entry
+			try {
+				libCache.getClassLoader(new JobID());
+				fail("Should fail with an IllegalStateException");
+			}
+			catch (IllegalStateException e) {
+				// that#s what we want
+			}
+
+			// now register some BLOBs as libraries
+			{
+				JobID jid = new JobID();
+				ExecutionAttemptID executionId = new ExecutionAttemptID();
+				Collection<BlobKey> keys = Collections.singleton(dataKey1);
+
+				libCache.registerTask(jid, executionId, keys);
+				assertEquals(1, libCache.getNumberOfReferenceHolders(jid));
+				assertEquals(1, libCache.getNumberOfCachedLibraries());
+				assertNotNull(libCache.getClassLoader(jid));
+
+				// un-register them again
+				libCache.unregisterTask(jid, executionId);
+				assertEquals(0, libCache.getNumberOfReferenceHolders(jid));
+
+				// library is still cached (but not associated with job any more)
+				assertEquals(1, libCache.getNumberOfCachedLibraries());
+
+				// should not be able to access the classloader any more
+				try {
+					libCache.getClassLoader(jid);
+					fail("Should fail with an IllegalStateException");
+				}
+				catch (IllegalStateException e) {
+					// that#s what we want
+				}
+			}
+
+			cacheDir = new File(cache.getStorageDir(), "cache");
+			assertTrue(cacheDir.exists());
+
+			// make sure no further blobs can be downloaded by removing the write
+			// permissions from the directory
+			assertTrue("Could not remove write permissions from cache directory", cacheDir.setWritable(false, false));
+
+			// since we cannot download this library any more, this call should fail
+			try {
+				libCache.registerTask(new JobID(), new ExecutionAttemptID(), Collections.singleton(dataKey2));
+				fail("This should fail with an IOException");
+			}
+			catch (IOException e) {
+				// splendid!
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (cacheDir != null) {
+				if (!cacheDir.setWritable(true, false)) {
+					System.err.println("Could not re-add write permissions to cache directory.");
+				}
+			}
+			if (cache != null) {
+				cache.shutdown();
+			}
+			if (server != null) {
+				server.shutdown();
+			}
+		}
+	}
 }