You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/04 11:06:36 UTC
[2/6] git commit: [FLINK-1205] Fix library cache manager to track
references to tasks and revent accidental duplicate
registration/deregistration
[FLINK-1205] Fix library cache manager to track references to tasks and revent accidental duplicate registration/deregistration
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a6152c37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a6152c37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a6152c37
Branch: refs/heads/master
Commit: a6152c372b86d3a62745b3703975d5d4eb243053
Parents: 5e48fc9
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Nov 3 20:43:00 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 3 20:43:00 2014 +0100
----------------------------------------------------------------------
.../librarycache/BlobLibraryCacheManager.java | 326 +++++++++----------
.../FallbackLibraryCacheManager.java | 16 +-
.../librarycache/LibraryCacheManager.java | 25 +-
.../flink/runtime/jobmanager/JobManager.java | 6 +-
.../flink/runtime/taskmanager/TaskManager.java | 11 +-
.../BlobLibraryCacheManagerTest.java | 6 +-
6 files changed, 206 insertions(+), 184 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a6152c37/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index 441f57e..70b60fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -30,17 +31,18 @@ import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobService;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
/**
* For each job graph that is submitted to the system the library cache manager maintains
* a set of libraries (typically JAR files) which the job requires to run. The library cache manager
@@ -52,43 +54,25 @@ import org.slf4j.LoggerFactory;
public final class BlobLibraryCacheManager extends TimerTask implements LibraryCacheManager {
private static Logger LOG = LoggerFactory.getLogger(BlobLibraryCacheManager.class);
-
- /**
- * Dummy object used in the lock map.
- */
+
+ private static ExecutionAttemptID JOB_ATTEMPT_ID = new ExecutionAttemptID();
+
+ // --------------------------------------------------------------------------------------------
+
+ /** The global lock to synchronize operations */
private final Object lockObject = new Object();
- /**
- * Map to translate a job ID to the responsible class loaders.
- */
- private final ConcurrentMap<JobID, URLClassLoader> classLoaders = new
- ConcurrentHashMap<JobID, URLClassLoader>();
-
- /**
- * Map to store the number of references to a specific library manager entry.
- */
- private final Map<JobID, Integer> libraryReferenceCounter = new HashMap<JobID,
- Integer>();
-
- /**
- * Map to store the blob keys referenced by a specific job
- */
- private final Map<JobID, Collection<BlobKey>> requiredJars = new
- HashMap<JobID, Collection<BlobKey>>();
-
- /**
- * Map to store the number of reference to a specific file
- */
- private final Map<BlobKey, Integer> blobKeyReferenceCounter = new
- HashMap<BlobKey, Integer>();
-
- /**
- * All registered blobs
- */
- private final Set<BlobKey> registeredBlobs = new HashSet<BlobKey>();
+ /** Registered entries per job */
+ private final Map<JobID, LibraryCacheEntry> cacheEntries = new HashMap<JobID, LibraryCacheEntry>();
+
+ /** Map to store the number of reference to a specific file */
+ private final Map<BlobKey, Integer> blobKeyReferenceCounters = new HashMap<BlobKey, Integer>();
+ /** The blob service to download libraries */
private final BlobService blobService;
-
+
+ // --------------------------------------------------------------------------------------------
+
public BlobLibraryCacheManager(BlobService blobService, Configuration configuration){
this.blobService = blobService;
@@ -100,143 +84,82 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
timer.schedule(this, cleanupInterval);
}
- /**
- * Increments the reference counter of the corrsponding map
- *
- * @param key
- * the key identifying the counter to increment
- * @return the increased reference counter
- */
- private <K> int incrementReferenceCounter(final K key, final Map<K,
- Integer> map) {
-
- if(!map.containsKey(key)){
- map.put(key, 1);
-
- return 1;
- }else{
- int counter = map.get(key) + 1;
- map.put(key, counter);
-
- return counter;
- }
- }
-
- /**
- * Decrements the reference counter associated with the key
- *
- * @param key
- * the key identifying the counter to decrement
- * @return the decremented reference counter
- */
- private <K> int decrementReferenceCounter(final K key, final Map<K,
- Integer> map) {
-
- if (!map.containsKey(key)) {
- throw new IllegalStateException("Cannot find reference counter entry for key " + key);
- }else{
- int counter = map.get(key) -1;
-
- if(counter == 0){
- map.remove(key);
- }else{
- map.put(key, counter);
- }
-
- return counter;
- }
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void registerJob(JobID id, Collection<BlobKey> requiredJarFiles) throws IOException {
+ registerTask(id, JOB_ATTEMPT_ID, requiredJarFiles);
}
-
- /**
- * Registers a job ID with a set of library paths that are required to run the job. For every registered
- * job the library cache manager creates a class loader that is used to instantiate the vertex's environment later
- * on.
- *
- * @param id
- * the ID of the job to be registered.
- * @param requiredJarFiles
- * the client path's of the required libraries
- * @throws IOException
- * thrown if one of the requested libraries is not in the cache
- */
+
@Override
- public void register(final JobID id, final Collection<BlobKey> requiredJarFiles) throws
- IOException {
-
+ public void registerTask(JobID jobId, ExecutionAttemptID task, Collection<BlobKey> requiredJarFiles) throws IOException {
+ Preconditions.checkNotNull(jobId, "The JobId must not be null.");
+ Preconditions.checkNotNull(task, "The task execution id must not be null.");
+
+ if (requiredJarFiles == null) {
+ requiredJarFiles = Collections.emptySet();
+ }
+
synchronized (lockObject) {
- if (incrementReferenceCounter(id, libraryReferenceCounter) > 1) {
- return;
- }
-
- // Check if library manager entry for this id already exists
- if (this.classLoaders.containsKey(id)) {
- throw new IllegalStateException("Library cache manager already contains " +
- "class loader entry for job ID " + id);
- }
-
- if (requiredJars.containsKey(id)) {
- throw new IllegalStateException("Library cache manager already contains blob keys" +
- " entry for job ID " + id);
+ LibraryCacheEntry entry = cacheEntries.get(jobId);
+
+ if (entry == null) {
+ URL[] urls = new URL[requiredJarFiles.size()];
+
+ int count = 0;
+ for (BlobKey blobKey : requiredJarFiles) {
+ urls[count++] = registerReferenceToBlobKeyAndGetURL(blobKey);
+ }
+
+ URLClassLoader classLoader = new URLClassLoader(urls);
+ cacheEntries.put(jobId, new LibraryCacheEntry(requiredJarFiles, classLoader, task));
}
-
- requiredJars.put(id, requiredJarFiles);
-
- URL[] urls = new URL[requiredJarFiles.size()];
- int count = 0;
-
- for (BlobKey blobKey : requiredJarFiles) {
- urls[count++] = registerBlobKeyAndGetURL(blobKey);
+ else {
+ entry.register(task, requiredJarFiles);
}
-
- final URLClassLoader classLoader = new URLClassLoader(urls);
- this.classLoaders.put(id, classLoader);
}
}
- private URL registerBlobKeyAndGetURL(BlobKey key) throws IOException{
- if(incrementReferenceCounter(key, blobKeyReferenceCounter) == 1){
- // registration might happen even if the file is already stored locally
- registeredBlobs.add(key);
- }
-
- return blobService.getURL(key);
+ @Override
+ public void unregisterJob(JobID id) {
+ unregisterTask(id, JOB_ATTEMPT_ID);
}
-
- /**
- * Unregisters a job ID and releases the resources associated with it.
- *
- * @param id
- * the job ID to unregister
- */
+
@Override
- public void unregister(final JobID id) {
+ public void unregisterTask(JobID jobId, ExecutionAttemptID task) {
+ Preconditions.checkNotNull(jobId, "The JobId must not be null.");
+ Preconditions.checkNotNull(task, "The task execution id must not be null.");
+
synchronized (lockObject) {
- if (decrementReferenceCounter(id, libraryReferenceCounter) == 0) {
- this.classLoaders.remove(id);
-
- Collection<BlobKey> keys = requiredJars.get(id);
-
- for (BlobKey key : keys) {
- decrementReferenceCounter(key, blobKeyReferenceCounter);
+ LibraryCacheEntry entry = cacheEntries.get(jobId);
+
+ if (entry != null) {
+ if (entry.unregister(task)) {
+ cacheEntries.remove(jobId);
+
+ for (BlobKey key : entry.getLibraries()) {
+ unregisterReferenceToBlobKey(key);
+ }
}
-
- requiredJars.remove(id);
}
+ // else has already been unregistered
}
-
}
- /**
- * Returns the class loader to the specified vertex.
- *
- * @param id
- * the ID of the job to return the class loader for
- * @return the class loader of requested vertex or <code>null</code> if no class loader has been registered with the
- * given ID.
- */
@Override
- public ClassLoader getClassLoader(final JobID id) {
- return this.classLoaders.get(id);
+ public ClassLoader getClassLoader(JobID id) {
+ if (id == null) {
+ throw new IllegalArgumentException("The JobId must not be null.");
+ }
+
+ synchronized (lockObject) {
+ LibraryCacheEntry entry = cacheEntries.get(id);
+ if (entry != null) {
+ return entry.getClassLoader();
+ } else {
+ throw new IllegalStateException("No libraries are registered for job " + id);
+ }
+ }
}
@Override
@@ -252,27 +175,100 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
public void shutdown() throws IOException{
blobService.shutdown();
}
-
+
/**
* Cleans up blobs which are not referenced anymore
*/
@Override
public void run() {
synchronized (lockObject) {
- Iterator<BlobKey> it = registeredBlobs.iterator();
-
- while (it.hasNext()) {
- BlobKey key = it.next();
-
+
+ Iterator<Map.Entry<BlobKey, Integer>> entryIter = blobKeyReferenceCounters.entrySet().iterator();
+
+ while (entryIter.hasNext()) {
+ Map.Entry<BlobKey, Integer> entry = entryIter.next();
+ BlobKey key = entry.getKey();
+ int references = entry.getValue();
+
try {
- if (!blobKeyReferenceCounter.containsKey(key)) {
+ if (references <= 0) {
blobService.delete(key);
- it.remove();
+ entryIter.remove();
}
- } catch (IOException ioe) {
- LOG.warn("Could not delete file with blob key" + key, ioe);
+ } catch (Throwable t) {
+ LOG.warn("Could not delete file with blob key" + key, t);
}
}
}
}
+
+ public int getNumberOfReferenceHolders(JobID jobId) {
+ synchronized (lockObject) {
+ LibraryCacheEntry entry = cacheEntries.get(jobId);
+ return entry == null ? 0 : entry.getNumberOfReferenceHolders();
+ }
+ }
+
+ private URL registerReferenceToBlobKeyAndGetURL(BlobKey key) throws IOException {
+
+ Integer references = blobKeyReferenceCounters.get(key);
+ int newReferences = references == null ? 1 : references.intValue() + 1;
+
+ blobKeyReferenceCounters.put(key, newReferences);
+
+ return blobService.getURL(key);
+ }
+
+ private void unregisterReferenceToBlobKey(BlobKey key) {
+ Integer references = blobKeyReferenceCounters.get(key);
+ if (references != null) {
+ int newReferences = Math.max(references.intValue() - 1, 0);
+ blobKeyReferenceCounters.put(key, newReferences);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private static class LibraryCacheEntry {
+
+ private final ClassLoader classLoader;
+
+ private final Set<ExecutionAttemptID> referenceHolders;
+
+ private final Set<BlobKey> libraries;
+
+
+ public LibraryCacheEntry(Collection<BlobKey> libraries, ClassLoader classLoader, ExecutionAttemptID initialReference) {
+ this.classLoader = classLoader;
+ this.libraries = new HashSet<BlobKey>(libraries);
+ this.referenceHolders = new HashSet<ExecutionAttemptID>();
+ this.referenceHolders.add(initialReference);
+ }
+
+
+ public ClassLoader getClassLoader() {
+ return classLoader;
+ }
+
+ public Set<BlobKey> getLibraries() {
+ return libraries;
+ }
+
+ public void register(ExecutionAttemptID task, Collection<BlobKey> keys) {
+ if (!libraries.containsAll(keys)) {
+ throw new IllegalStateException("The library registration references a different set of libraries than previous registrations for this job.");
+ }
+
+ this.referenceHolders.add(task);
+ }
+
+ public boolean unregister(ExecutionAttemptID task) {
+ referenceHolders.remove(task);
+ return referenceHolders.isEmpty();
+ }
+
+ public int getNumberOfReferenceHolders() {
+ return referenceHolders.size();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a6152c37/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
index 5c162a0..532107f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.execution.librarycache;
import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,6 +29,7 @@ import java.io.IOException;
import java.util.Collection;
public class FallbackLibraryCacheManager implements LibraryCacheManager {
+
private static Logger LOG = LoggerFactory.getLogger(FallbackLibraryCacheManager.class);
@Override
@@ -41,12 +43,22 @@ public class FallbackLibraryCacheManager implements LibraryCacheManager {
}
@Override
- public void register(JobID id, Collection<BlobKey> requiredJarFiles) throws IOException {
+ public void registerJob(JobID id, Collection<BlobKey> requiredJarFiles) {
+ LOG.warn("FallbackLibraryCacheManager cannot download files associated with blob keys.");
+ }
+
+ @Override
+ public void registerTask(JobID id, ExecutionAttemptID execution, Collection<BlobKey> requiredJarFiles) {
LOG.warn("FallbackLibraryCacheManager cannot download files associated with blob keys.");
}
@Override
- public void unregister(JobID id) {
+ public void unregisterJob(JobID id) {
+ LOG.warn("FallbackLibraryCacheManager does not book keeping of job IDs.");
+ }
+
+ @Override
+ public void unregisterTask(JobID id, ExecutionAttemptID execution) {
LOG.warn("FallbackLibraryCacheManager does not book keeping of job IDs.");
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a6152c37/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
index a0a95ab..63d85b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.execution.librarycache;
import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobID;
import java.io.File;
@@ -32,7 +33,7 @@ public interface LibraryCacheManager {
* @param id identifying the job
* @return ClassLoader which can load the user code
*/
- ClassLoader getClassLoader(final JobID id);
+ ClassLoader getClassLoader(JobID id);
/**
* Returns a file handle to the file identified by the blob key.
@@ -41,7 +42,7 @@ public interface LibraryCacheManager {
* @return File handle
* @throws IOException
*/
- File getFile(final BlobKey blobKey) throws IOException;
+ File getFile(BlobKey blobKey) throws IOException;
/**
* Registers a job with its required jar files. The jar files are identified by their blob keys.
@@ -50,14 +51,30 @@ public interface LibraryCacheManager {
* @param requiredJarFiles collection of blob keys identifying the required jar files
* @throws IOException
*/
- void register(final JobID id, final Collection<BlobKey> requiredJarFiles) throws IOException;
+ void registerJob(JobID id, Collection<BlobKey> requiredJarFiles) throws IOException;
+
+ /**
+ * Registers a job task execution with its required jar files. The jar files are identified by their blob keys.
+ *
+ * @param id job ID
+ * @param requiredJarFiles collection of blob keys identifying the required jar files
+ * @throws IOException
+ */
+ void registerTask(JobID id, ExecutionAttemptID execution, Collection<BlobKey> requiredJarFiles) throws IOException;
/**
* Unregisters a job from the library cache manager.
*
* @param id job ID
*/
- void unregister(final JobID id);
+ void unregisterTask(JobID id, ExecutionAttemptID execution);
+
+ /**
+ * Unregisters a job from the library cache manager.
+ *
+ * @param id job ID
+ */
+ void unregisterJob(JobID id);
/**
* Shutdown method
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a6152c37/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 4bce4a4..7fb4f94 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -327,7 +327,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
}
// Register this job with the library cache manager
- libraryCacheManager.register(job.getJobID(), job.getUserJarBlobKeys());
+ libraryCacheManager.registerJob(job.getJobID(), job.getUserJarBlobKeys());
// get the existing execution graph (if we attach), or construct a new empty one to attach
executionGraph = this.currentJobs.get(job.getJobID());
@@ -425,7 +425,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
// job was not prperly removed by the fail call
if(currentJobs.contains(job.getJobID())){
currentJobs.remove(job.getJobID());
- libraryCacheManager.unregister(job.getJobID());
+ libraryCacheManager.unregisterJob(job.getJobID());
}
return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException(t));
@@ -524,7 +524,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
this.currentJobs.remove(jid);
try {
- libraryCacheManager.unregister(jid);
+ libraryCacheManager.unregisterJob(jid);
}
catch (Throwable t) {
LOG.warn("Could not properly unregister job " + jid + " from the library cache.");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a6152c37/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index 74ea220..1380d91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -574,7 +574,6 @@ public class TaskManager implements TaskOperationProtocol {
final int numSubtasks = tdd.getCurrentNumberOfSubtasks();
Task task = null;
- boolean jarsRegistered = false;
// check if the taskmanager is shut down or disconnected
if (shutdownStarted.get()) {
@@ -586,8 +585,7 @@ public class TaskManager implements TaskOperationProtocol {
try {
// Now register data with the library manager
- libraryCacheManager.register(jobID, tdd.getRequiredJarFiles());
- jarsRegistered = true;
+ libraryCacheManager.registerTask(jobID, executionId, tdd.getRequiredJarFiles());
// library and classloader issues first
final ClassLoader userCodeClassLoader = libraryCacheManager.getClassLoader(jobID);
@@ -659,9 +657,8 @@ public class TaskManager implements TaskOperationProtocol {
if (task != null) {
removeAllTaskResources(task);
}
- if (jarsRegistered) {
- libraryCacheManager.unregister(jobID);
- }
+
+ libraryCacheManager.unregisterTask(jobID, executionId);
}
catch (Throwable t2) {
LOG.error("Error during cleanup of task deployment", t2);
@@ -690,7 +687,7 @@ public class TaskManager implements TaskOperationProtocol {
removeAllTaskResources(task);
// Unregister task from library cache manager
- libraryCacheManager.unregister(task.getJobID());
+ libraryCacheManager.unregisterTask(task.getJobID(), executionId);
}
private void removeAllTaskResources(Task task) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a6152c37/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index df32a81..67dd8a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -60,18 +60,18 @@ public class BlobLibraryCacheManagerTest {
keys.add(bc.put(buf));
libraryCacheManager = new BlobLibraryCacheManager(server, GlobalConfiguration.getConfiguration());
- libraryCacheManager.register(jid, keys);
+ libraryCacheManager.registerJob(jid, keys);
List<File> files = new ArrayList<File>();
- for(BlobKey key: keys){
+ for (BlobKey key: keys){
files.add(libraryCacheManager.getFile(key));
}
assertEquals(2, files.size());
files.clear();
- libraryCacheManager.unregister(jid);
+ libraryCacheManager.unregisterJob(jid);
Thread.sleep(1500);