You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/30 14:14:57 UTC
[3/4] flink git commit: [runtime] Improve error messages and add
tests for failed library downloads on TaskManagers
[runtime] Improve error messages and add tests for failed library downloads on TaskManagers
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2667f0e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2667f0e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2667f0e0
Branch: refs/heads/master
Commit: 2667f0e0c2dab541599269bba21e6336955d942c
Parents: 01adab5
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Mar 29 19:27:17 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 30 12:57:16 2015 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/blob/BlobCache.java | 3 +-
.../librarycache/BlobLibraryCacheManager.java | 18 +--
.../BlobLibraryCacheManagerTest.java | 123 +++++++++++++++++--
3 files changed, 125 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2667f0e0/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 33bb7f0..fd768a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -162,7 +162,8 @@ public final class BlobCache implements BlobService {
}
}
catch (IOException e) {
- String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress + '.';
+ String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress +
+ " and store it under " + localJarFile.getAbsolutePath();
if (attempt < numFetchRetries) {
attempt++;
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/2667f0e0/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index 680f968..d394e2d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -47,8 +47,6 @@ import com.google.common.base.Preconditions;
* a set of libraries (typically JAR files) which the job requires to run. The library cache manager
* caches library files in order to avoid unnecessary retransmission of data. It is based on a singleton
* programming pattern, so there exists at most on library manager at a time.
- * <p>
- * This class is thread-safe.
*/
public final class BlobLibraryCacheManager extends TimerTask implements LibraryCacheManager {
@@ -236,20 +234,20 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
URL url = blobService.getURL(key);
Integer references = blobKeyReferenceCounters.get(key);
- int newReferences = references == null ? 1 : references.intValue() + 1;
+ int newReferences = references == null ? 1 : references + 1;
blobKeyReferenceCounters.put(key, newReferences);
return url;
}
catch (IOException e) {
- throw new IOException("Cannot access jar file stored under " + key, e);
+ throw new IOException("Cannot get library with hash " + key, e);
}
}
private void unregisterReferenceToBlobKey(BlobKey key) {
Integer references = blobKeyReferenceCounters.get(key);
if (references != null) {
- int newReferences = Math.max(references.intValue() - 1, 0);
+ int newReferences = Math.max(references - 1, 0);
blobKeyReferenceCounters.put(key, newReferences);
}
else {
@@ -261,7 +259,12 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
// --------------------------------------------------------------------------------------------
-
+
+ /**
+ * An entry in the per-job library cache. Tracks which execution attempts
+ * still reference the libraries. Once none reference it any more, the
+ * libraries can be cleaned up.
+ */
private static class LibraryCacheEntry {
private final ClassLoader classLoader;
@@ -289,7 +292,8 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
public void register(ExecutionAttemptID task, Collection<BlobKey> keys) {
if (!libraries.containsAll(keys)) {
- throw new IllegalStateException("The library registration references a different set of libraries than previous registrations for this job.");
+ throw new IllegalStateException(
+ "The library registration references a different set of libraries than previous registrations for this job.");
}
this.referenceHolders.add(task);
http://git-wip-us.apache.org/repos/asf/flink/blob/2667f0e0/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 4c24b1e..239e356 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -21,23 +21,28 @@ package org.apache.flink.runtime.execution.librarycache;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobID;
import org.junit.Test;
+
import static org.junit.Assert.*;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
public class BlobLibraryCacheManagerTest {
@Test
- public void testLibraryCacheManagerCleanup(){
+ public void testLibraryCacheManagerCleanup() {
Configuration config = new Configuration();
config.setLong(ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, 1);
@@ -65,7 +70,7 @@ public class BlobLibraryCacheManagerTest {
List<File> files = new ArrayList<File>();
- for (BlobKey key: keys){
+ for (BlobKey key : keys) {
files.add(libraryCacheManager.getFile(key));
}
@@ -81,10 +86,10 @@ public class BlobLibraryCacheManagerTest {
do {
Thread.sleep(500);
}
- while (libraryCacheManager.getNumberOfCachedLibraries() > 0 &&
+ while (libraryCacheManager.getNumberOfCachedLibraries() > 0 &&
System.currentTimeMillis() < deadline);
}
-
+
// this fails if we exited via a timeout
assertEquals(0, libraryCacheManager.getNumberOfCachedLibraries());
@@ -94,31 +99,127 @@ public class BlobLibraryCacheManagerTest {
// the blob cache should no longer contain the files
try {
files.add(libraryCacheManager.getFile(key));
- } catch (IOException ioe) {
+ }
+ catch (IOException ioe) {
caughtExceptions++;
}
}
assertEquals(2, caughtExceptions);
-
+
bc.close();
}
- catch (Exception e){
+ catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
- finally{
- if (server != null){
+ finally {
+ if (server != null) {
server.shutdown();
}
- if (libraryCacheManager != null){
+ if (libraryCacheManager != null) {
try {
libraryCacheManager.shutdown();
- } catch (IOException e) {
+ }
+ catch (IOException e) {
e.printStackTrace();
}
}
}
}
+
+ @Test
+ public void testRegisterAndDownload() {
+ BlobServer server = null;
+ BlobCache cache = null;
+ File cacheDir = null;
+ try {
+ // create the blob transfer services
+ Configuration config = new Configuration();
+ server = new BlobServer(config);
+ InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
+ cache = new BlobCache(serverAddress, config);
+
+ // upload some meaningless data to the server
+ BlobClient uploader = new BlobClient(serverAddress);
+ BlobKey dataKey1 = uploader.put(new byte[]{1, 2, 3, 4, 5, 6, 7, 8});
+ BlobKey dataKey2 = uploader.put(new byte[]{11, 12, 13, 14, 15, 16, 17, 18});
+ uploader.close();
+
+ BlobLibraryCacheManager libCache = new BlobLibraryCacheManager(cache, 1000000000L);
+
+ assertEquals(0, libCache.getNumberOfCachedLibraries());
+
+ // first try to access a non-existing entry
+ try {
+ libCache.getClassLoader(new JobID());
+ fail("Should fail with an IllegalStateException");
+ }
+ catch (IllegalStateException e) {
+ // that#s what we want
+ }
+
+ // now register some BLOBs as libraries
+ {
+ JobID jid = new JobID();
+ ExecutionAttemptID executionId = new ExecutionAttemptID();
+ Collection<BlobKey> keys = Collections.singleton(dataKey1);
+
+ libCache.registerTask(jid, executionId, keys);
+ assertEquals(1, libCache.getNumberOfReferenceHolders(jid));
+ assertEquals(1, libCache.getNumberOfCachedLibraries());
+ assertNotNull(libCache.getClassLoader(jid));
+
+ // un-register them again
+ libCache.unregisterTask(jid, executionId);
+ assertEquals(0, libCache.getNumberOfReferenceHolders(jid));
+
+ // library is still cached (but not associated with job any more)
+ assertEquals(1, libCache.getNumberOfCachedLibraries());
+
+ // should not be able to access the classloader any more
+ try {
+ libCache.getClassLoader(jid);
+ fail("Should fail with an IllegalStateException");
+ }
+ catch (IllegalStateException e) {
+ // that#s what we want
+ }
+ }
+
+ cacheDir = new File(cache.getStorageDir(), "cache");
+ assertTrue(cacheDir.exists());
+
+ // make sure no further blobs can be downloaded by removing the write
+ // permissions from the directory
+ assertTrue("Could not remove write permissions from cache directory", cacheDir.setWritable(false, false));
+
+ // since we cannot download this library any more, this call should fail
+ try {
+ libCache.registerTask(new JobID(), new ExecutionAttemptID(), Collections.singleton(dataKey2));
+ fail("This should fail with an IOException");
+ }
+ catch (IOException e) {
+ // splendid!
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ if (cacheDir != null) {
+ if (!cacheDir.setWritable(true, false)) {
+ System.err.println("Could not re-add write permissions to cache directory.");
+ }
+ }
+ if (cache != null) {
+ cache.shutdown();
+ }
+ if (server != null) {
+ server.shutdown();
+ }
+ }
+ }
}