You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/05 14:06:55 UTC
[02/14] flink git commit: [FLINK-7483][blob] prevent cleanup of
re-registered jobs
[FLINK-7483][blob] prevent cleanup of re-registered jobs
When a job is registered, it may have been released before and we thus need to
reset the cleanup timeout again.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40ef9082
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40ef9082
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40ef9082
Branch: refs/heads/master
Commit: 40ef9082a971bc2397952b69670cb82ed8480a90
Parents: 4947ee6
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Aug 21 10:36:56 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 5 16:06:26 2017 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/blob/BlobCache.java | 18 +++++-
.../runtime/blob/BlobCacheCleanupTest.java | 60 +++++++++++++++++++-
2 files changed, 75 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/40ef9082/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index c50a888..ccbbdf0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -81,7 +81,8 @@ public class BlobCache extends TimerTask implements BlobService {
/**
* Job reference counters with a time-to-live (TTL).
*/
- private static class RefCount {
+ @VisibleForTesting
+ static class RefCount {
/**
* Number of references to a job.
*/
@@ -166,6 +167,9 @@ public class BlobCache extends TimerTask implements BlobService {
if (ref == null) {
ref = new RefCount();
jobRefCounters.put(jobId, ref);
+ } else {
+ // reset cleanup timeout
+ ref.keepUntil = -1;
}
++ref.references;
}
@@ -184,7 +188,7 @@ public class BlobCache extends TimerTask implements BlobService {
RefCount ref = jobRefCounters.get(jobId);
if (ref == null) {
- LOG.warn("improper use of releaseJob() without a matching number of registerJob() calls");
+ LOG.warn("improper use of releaseJob() without a matching number of registerJob() calls for jobId " + jobId);
return;
}
@@ -485,6 +489,16 @@ public class BlobCache extends TimerTask implements BlobService {
}
/**
+ * Returns the job reference counters - for testing purposes only!
+ *
+ * @return job reference counters (internal state!)
+ */
+ @VisibleForTesting
+ Map<JobID, RefCount> getJobRefCounters() {
+ return jobRefCounters;
+ }
+
+ /**
* Returns a file handle to the file associated with the given blob key on the blob
* server.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/40ef9082/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
index afd365b..7eef0a4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.blob;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.TestLogger;
@@ -37,7 +36,9 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
/**
* A few tests for the deferred ref-counting based cleanup inside the {@link BlobCache}.
@@ -142,6 +143,63 @@ public class BlobCacheCleanupTest extends TestLogger {
}
/**
+ * Tests that {@link BlobCache} sets the expected reference counts and cleanup timeouts when
+ * registering, releasing, and re-registering jobs.
+ */
+ @Test
+ public void testJobReferences() throws IOException, InterruptedException {
+
+ JobID jobId = new JobID();
+
+ Configuration config = new Configuration();
+ config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+ temporaryFolder.newFolder().getAbsolutePath());
+ config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 3_600_000L); // 1 hour should effectively prevent races
+
+ // NOTE: use fake address - we will not connect to it here
+ InetSocketAddress serverAddress = new InetSocketAddress("localhost", 12345);
+
+ try (BlobCache cache = new BlobCache(serverAddress, config, new VoidBlobStore())) {
+
+ // register once
+ cache.registerJob(jobId);
+ assertEquals(1, cache.getJobRefCounters().get(jobId).references);
+ assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+
+ // register a second time
+ cache.registerJob(jobId);
+ assertEquals(2, cache.getJobRefCounters().get(jobId).references);
+ assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+
+ // release once
+ cache.releaseJob(jobId);
+ assertEquals(1, cache.getJobRefCounters().get(jobId).references);
+ assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+
+ // release a second time
+ long cleanupLowerBound =
+ System.currentTimeMillis() + config.getLong(BlobServerOptions.CLEANUP_INTERVAL);
+ cache.releaseJob(jobId);
+ assertEquals(0, cache.getJobRefCounters().get(jobId).references);
+ assertThat(cache.getJobRefCounters().get(jobId).keepUntil,
+ greaterThanOrEqualTo(cleanupLowerBound));
+
+ // register again
+ cache.registerJob(jobId);
+ assertEquals(1, cache.getJobRefCounters().get(jobId).references);
+ assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+
+ // finally release the job
+ cleanupLowerBound =
+ System.currentTimeMillis() + config.getLong(BlobServerOptions.CLEANUP_INTERVAL);
+ cache.releaseJob(jobId);
+ assertEquals(0, cache.getJobRefCounters().get(jobId).references);
+ assertThat(cache.getJobRefCounters().get(jobId).keepUntil,
+ greaterThanOrEqualTo(cleanupLowerBound));
+ }
+ }
+
+ /**
* Tests that {@link BlobCache} cleans up after calling {@link BlobCache#releaseJob(JobID)}
* but only after preserving the file for a bit longer.
*/