You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/05 14:06:55 UTC

[02/14] flink git commit: [FLINK-7483][blob] prevent cleanup of re-registered jobs

[FLINK-7483][blob] prevent cleanup of re-registered jobs

When a job is registered, it may have been released before and we thus need to
reset the cleanup timeout again.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40ef9082
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40ef9082
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40ef9082

Branch: refs/heads/master
Commit: 40ef9082a971bc2397952b69670cb82ed8480a90
Parents: 4947ee6
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Aug 21 10:36:56 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 5 16:06:26 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/blob/BlobCache.java    | 18 +++++-
 .../runtime/blob/BlobCacheCleanupTest.java      | 60 +++++++++++++++++++-
 2 files changed, 75 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/40ef9082/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index c50a888..ccbbdf0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -81,7 +81,8 @@ public class BlobCache extends TimerTask implements BlobService {
 	/**
 	 * Job reference counters with a time-to-live (TTL).
 	 */
-	private static class RefCount {
+	@VisibleForTesting
+	static class RefCount {
 		/**
 		 * Number of references to a job.
 		 */
@@ -166,6 +167,9 @@ public class BlobCache extends TimerTask implements BlobService {
 			if (ref == null) {
 				ref = new RefCount();
 				jobRefCounters.put(jobId, ref);
+			} else {
+				// reset cleanup timeout
+				ref.keepUntil = -1;
 			}
 			++ref.references;
 		}
@@ -184,7 +188,7 @@ public class BlobCache extends TimerTask implements BlobService {
 			RefCount ref = jobRefCounters.get(jobId);
 
 			if (ref == null) {
-				LOG.warn("improper use of releaseJob() without a matching number of registerJob() calls");
+				LOG.warn("improper use of releaseJob() without a matching number of registerJob() calls for jobId " + jobId);
 				return;
 			}
 
@@ -485,6 +489,16 @@ public class BlobCache extends TimerTask implements BlobService {
 	}
 
 	/**
+	 * Returns the job reference counters - for testing purposes only!
+	 *
+	 * @return job reference counters (internal state!)
+	 */
+	@VisibleForTesting
+	Map<JobID, RefCount> getJobRefCounters() {
+		return jobRefCounters;
+	}
+
+	/**
 	 * Returns a file handle to the file associated with the given blob key on the blob
 	 * server.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/40ef9082/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
index afd365b..7eef0a4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.blob;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.TestLogger;
 
@@ -37,7 +36,9 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 
 /**
  * A few tests for the deferred ref-counting based cleanup inside the {@link BlobCache}.
@@ -142,6 +143,63 @@ public class BlobCacheCleanupTest extends TestLogger {
 	}
 
 	/**
+	 * Tests that {@link BlobCache} sets the expected reference counts and cleanup timeouts when
+	 * registering, releasing, and re-registering jobs.
+	 */
+	@Test
+	public void testJobReferences() throws IOException, InterruptedException {
+
+		JobID jobId = new JobID();
+
+		Configuration config = new Configuration();
+		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+			temporaryFolder.newFolder().getAbsolutePath());
+		config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 3_600_000L); // 1 hour should effectively prevent races
+
+		// NOTE: use fake address - we will not connect to it here
+		InetSocketAddress serverAddress = new InetSocketAddress("localhost", 12345);
+
+		try (BlobCache cache = new BlobCache(serverAddress, config, new VoidBlobStore())) {
+
+			// register once
+			cache.registerJob(jobId);
+			assertEquals(1, cache.getJobRefCounters().get(jobId).references);
+			assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+
+			// register a second time
+			cache.registerJob(jobId);
+			assertEquals(2, cache.getJobRefCounters().get(jobId).references);
+			assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+
+			// release once
+			cache.releaseJob(jobId);
+			assertEquals(1, cache.getJobRefCounters().get(jobId).references);
+			assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+
+			// release a second time
+			long cleanupLowerBound =
+				System.currentTimeMillis() + config.getLong(BlobServerOptions.CLEANUP_INTERVAL);
+			cache.releaseJob(jobId);
+			assertEquals(0, cache.getJobRefCounters().get(jobId).references);
+			assertThat(cache.getJobRefCounters().get(jobId).keepUntil,
+				greaterThanOrEqualTo(cleanupLowerBound));
+
+			// register again
+			cache.registerJob(jobId);
+			assertEquals(1, cache.getJobRefCounters().get(jobId).references);
+			assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);
+
+			// finally release the job
+			cleanupLowerBound =
+				System.currentTimeMillis() + config.getLong(BlobServerOptions.CLEANUP_INTERVAL);
+			cache.releaseJob(jobId);
+			assertEquals(0, cache.getJobRefCounters().get(jobId).references);
+			assertThat(cache.getJobRefCounters().get(jobId).keepUntil,
+				greaterThanOrEqualTo(cleanupLowerBound));
+		}
+	}
+
+	/**
 	 * Tests that {@link BlobCache} cleans up after calling {@link BlobCache#releaseJob(JobID)}
 	 * but only after preserving the file for a bit longer.
 	 */