You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/18 07:36:48 UTC
[3/5] flink git commit: [FLINK-7057][blob] move ref-counting from the
LibraryCacheManager to the BlobCache
http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 23f0a38..933c7a0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.jobmanager;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -27,7 +28,6 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
@@ -90,6 +90,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -158,6 +159,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());
flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots);
+ flinkConfiguration.setLong(BlobServerOptions.CLEANUP_INTERVAL, 3_600L);
try {
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
@@ -179,6 +181,9 @@ public class JobManagerHARecoveryTest extends TestLogger {
archive = system.actorOf(JobManager.getArchiveProps(MemoryArchivist.class, 10, Option.<Path>empty()));
+ BlobServer blobServer = new BlobServer(
+ flinkConfiguration,
+ testingHighAvailabilityServices.createBlobStore());
Props jobManagerProps = Props.create(
TestingJobManager.class,
flinkConfiguration,
@@ -186,11 +191,8 @@ public class JobManagerHARecoveryTest extends TestLogger {
TestingUtils.defaultExecutor(),
instanceManager,
scheduler,
- new BlobLibraryCacheManager(
- new BlobServer(
- flinkConfiguration,
- testingHighAvailabilityServices.createBlobStore()),
- 3600000L),
+ blobServer,
+ new BlobLibraryCacheManager(blobServer),
archive,
new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
timeout,
@@ -353,6 +355,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
final Collection<JobID> recoveredJobs = new ArrayList<>(2);
+ BlobServer blobServer = mock(BlobServer.class);
Props jobManagerProps = Props.create(
TestingFailingHAJobManager.class,
flinkConfiguration,
@@ -360,7 +363,8 @@ public class JobManagerHARecoveryTest extends TestLogger {
TestingUtils.defaultExecutor(),
mock(InstanceManager.class),
mock(Scheduler.class),
- new BlobLibraryCacheManager(mock(BlobService.class), 1 << 20),
+ blobServer,
+ new BlobLibraryCacheManager(blobServer),
ActorRef.noSender(),
new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
timeout,
@@ -397,6 +401,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
Executor ioExecutor,
InstanceManager instanceManager,
Scheduler scheduler,
+ BlobServer blobServer,
BlobLibraryCacheManager libraryCacheManager,
ActorRef archive,
RestartStrategyFactory restartStrategyFactory,
@@ -413,6 +418,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
ioExecutor,
instanceManager,
scheduler,
+ blobServer,
libraryCacheManager,
archive,
restartStrategyFactory,
http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 3c75971..6a39293 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -137,14 +137,13 @@ public class JobSubmitTest {
// upload two dummy bytes and add their keys to the job graph as dependencies
BlobKey key1, key2;
BlobClient bc = new BlobClient(new InetSocketAddress("localhost", blobPort), jmConfig);
- // TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager
- JobID jobId = null;
+ JobID jobId = jg.getJobID();
try {
key1 = bc.put(jobId, new byte[10]);
key2 = bc.put(jobId, new byte[10]);
// delete one of the blobs to make sure that the startup failed
- bc.delete(key2);
+ bc.delete(jobId, key2);
}
finally {
bc.close();
http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index df35369..2c17b5a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -92,8 +93,8 @@ public class JobMasterTest extends TestLogger {
final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor);
- final BlobLibraryCacheManager libraryCacheManager = mock(BlobLibraryCacheManager.class);
- when(libraryCacheManager.getBlobServerPort()).thenReturn(1337);
+ BlobServer blobServer = mock(BlobServer.class);
+ when(blobServer.getPort()).thenReturn(1337);
final JobGraph jobGraph = new JobGraph();
@@ -106,7 +107,8 @@ public class JobMasterTest extends TestLogger {
haServices,
heartbeatServices,
Executors.newScheduledThreadPool(1),
- libraryCacheManager,
+ blobServer,
+ mock(BlobLibraryCacheManager.class),
mock(RestartStrategyFactory.class),
Time.of(10, TimeUnit.SECONDS),
null,
@@ -204,6 +206,7 @@ public class JobMasterTest extends TestLogger {
haServices,
heartbeatServices,
Executors.newScheduledThreadPool(1),
+ mock(BlobServer.class),
mock(BlobLibraryCacheManager.class),
mock(RestartStrategyFactory.class),
Time.of(10, TimeUnit.SECONDS),
http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index 70800e5..230ca91 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -25,9 +25,10 @@ import akka.actor.Props;
import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import akka.util.Timeout;
-
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.test.TestingServer;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobServer;
@@ -47,13 +48,11 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.util.TestLogger;
-
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Future;
@@ -178,6 +177,9 @@ public class JobManagerLeaderElectionTest extends TestLogger {
SubmittedJobGraphStore submittedJobGraphStore = new StandaloneSubmittedJobGraphStore();
CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory();
+ configuration.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
+
+ BlobServer blobServer = new BlobServer(configuration, new VoidBlobStore());
return Props.create(
TestingJobManager.class,
configuration,
@@ -185,7 +187,8 @@ public class JobManagerLeaderElectionTest extends TestLogger {
TestingUtils.defaultExecutor(),
new InstanceManager(),
new Scheduler(TestingUtils.defaultExecutionContext()),
- new BlobLibraryCacheManager(new BlobServer(configuration, new VoidBlobStore()), 10L),
+ blobServer,
+ new BlobLibraryCacheManager(blobServer),
ActorRef.noSender(),
new NoRestartStrategy.NoRestartStrategyFactory(),
AkkaUtils.getDefaultTimeoutAsFiniteDuration(),
http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 43ff60b..6842bee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -668,7 +669,7 @@ public class TaskExecutorTest extends TestLogger {
Collections.<InputGateDeploymentDescriptor>emptyList());
final LibraryCacheManager libraryCacheManager = mock(LibraryCacheManager.class);
- when(libraryCacheManager.getClassLoader(eq(jobId))).thenReturn(getClass().getClassLoader());
+ when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
final JobManagerConnection jobManagerConnection = new JobManagerConnection(
jobId,
@@ -677,6 +678,7 @@ public class TaskExecutorTest extends TestLogger {
jobManagerLeaderId,
mock(TaskManagerActions.class),
mock(CheckpointResponder.class),
+ mock(BlobCache.class),
libraryCacheManager,
mock(ResultPartitionConsumableNotifier.class),
mock(PartitionProducerStateChecker.class));
@@ -1191,6 +1193,7 @@ public class TaskExecutorTest extends TestLogger {
jobManagerLeaderId,
mock(TaskManagerActions.class),
mock(CheckpointResponder.class),
+ mock(BlobCache.class),
libraryCacheManager,
mock(ResultPartitionConsumableNotifier.class),
mock(PartitionProducerStateChecker.class));
http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 085a386..392dc29 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -145,6 +146,7 @@ public class TaskAsyncCallTest {
}
private static Task createTask() throws Exception {
+ BlobCache blobCache = mock(BlobCache.class);
LibraryCacheManager libCache = mock(LibraryCacheManager.class);
when(libCache.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
@@ -195,6 +197,7 @@ public class TaskAsyncCallTest {
mock(TaskManagerActions.class),
mock(InputSplitProvider.class),
mock(CheckpointResponder.class),
+ blobCache,
libCache,
mock(FileCache.class),
new TestingTaskManagerRuntimeInfo(),
http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index 1ebd4ad..ac0df36 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -98,6 +99,7 @@ public class TaskStopTest {
mock(TaskManagerActions.class),
mock(InputSplitProvider.class),
mock(CheckpointResponder.class),
+ mock(BlobCache.class),
mock(LibraryCacheManager.class),
mock(FileCache.class),
tmRuntimeInfo,
http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index ba3e820..d4cd0cf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -227,7 +228,8 @@ public class TaskTest extends TestLogger {
@Test
public void testLibraryCacheRegistrationFailed() {
try {
- Task task = createTask(TestInvokableCorrect.class, mock(LibraryCacheManager.class));
+ Task task = createTask(TestInvokableCorrect.class, mock(BlobCache.class),
+ mock(LibraryCacheManager.class));
// task should be new and perfect
assertEquals(ExecutionState.CREATED, task.getExecutionState());
@@ -260,6 +262,7 @@ public class TaskTest extends TestLogger {
@Test
public void testExecutionFailsInNetworkRegistration() {
try {
+ BlobCache blobCache = mock(BlobCache.class);
// mock a working library cache
LibraryCacheManager libCache = mock(LibraryCacheManager.class);
when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
@@ -274,7 +277,7 @@ public class TaskTest extends TestLogger {
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
doThrow(new RuntimeException("buffers")).when(network).registerTask(any(Task.class));
- Task task = createTask(TestInvokableCorrect.class, libCache, network, consumableNotifier, partitionProducerStateChecker, executor);
+ Task task = createTask(TestInvokableCorrect.class, blobCache, libCache, network, consumableNotifier, partitionProducerStateChecker, executor);
task.registerExecutionListener(listener);
@@ -617,6 +620,7 @@ public class TaskTest extends TestLogger {
IntermediateDataSetID resultId = new IntermediateDataSetID();
ResultPartitionID partitionId = new ResultPartitionID();
+ BlobCache blobCache = mock(BlobCache.class);
LibraryCacheManager libCache = mock(LibraryCacheManager.class);
when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
@@ -629,7 +633,7 @@ public class TaskTest extends TestLogger {
when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
.thenReturn(mock(TaskKvStateRegistry.class));
- createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
+ createTask(InvokableBlockingInInvoke.class, blobCache, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
// Test all branches of trigger partition state check
@@ -638,7 +642,7 @@ public class TaskTest extends TestLogger {
createQueuesAndActors();
// PartitionProducerDisposedException
- Task task = createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
+ Task task = createTask(InvokableBlockingInInvoke.class, blobCache, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
@@ -654,7 +658,7 @@ public class TaskTest extends TestLogger {
createQueuesAndActors();
// Any other exception
- Task task = createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
+ Task task = createTask(InvokableBlockingInInvoke.class, blobCache, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
@@ -671,7 +675,7 @@ public class TaskTest extends TestLogger {
createQueuesAndActors();
// TimeoutException handled special => retry
- Task task = createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
+ Task task = createTask(InvokableBlockingInInvoke.class, blobCache, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
SingleInputGate inputGate = mock(SingleInputGate.class);
when(inputGate.getConsumedResultId()).thenReturn(resultId);
@@ -702,7 +706,7 @@ public class TaskTest extends TestLogger {
createQueuesAndActors();
// Success
- Task task = createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
+ Task task = createTask(InvokableBlockingInInvoke.class, blobCache, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
SingleInputGate inputGate = mock(SingleInputGate.class);
when(inputGate.getConsumedResultId()).thenReturn(resultId);
@@ -882,26 +886,30 @@ public class TaskTest extends TestLogger {
}
private Task createTask(Class<? extends AbstractInvokable> invokable, Configuration config) throws IOException {
+ BlobCache blobCache = mock(BlobCache.class);
LibraryCacheManager libCache = mock(LibraryCacheManager.class);
when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
- return createTask(invokable, libCache, config, new ExecutionConfig());
+ return createTask(invokable, blobCache,libCache, config, new ExecutionConfig());
}
private Task createTask(Class<? extends AbstractInvokable> invokable, Configuration config, ExecutionConfig execConfig) throws IOException {
+ BlobCache blobCache = mock(BlobCache.class);
LibraryCacheManager libCache = mock(LibraryCacheManager.class);
when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
- return createTask(invokable, libCache, config, execConfig);
+ return createTask(invokable, blobCache,libCache, config, execConfig);
}
private Task createTask(
Class<? extends AbstractInvokable> invokable,
+ BlobCache blobCache,
LibraryCacheManager libCache) throws IOException {
- return createTask(invokable, libCache, new Configuration(), new ExecutionConfig());
+ return createTask(invokable, blobCache,libCache, new Configuration(), new ExecutionConfig());
}
private Task createTask(
Class<? extends AbstractInvokable> invokable,
+ BlobCache blobCache,
LibraryCacheManager libCache,
Configuration config,
ExecutionConfig execConfig) throws IOException {
@@ -916,21 +924,23 @@ public class TaskTest extends TestLogger {
when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
.thenReturn(mock(TaskKvStateRegistry.class));
- return createTask(invokable, libCache, network, consumableNotifier, partitionProducerStateChecker, executor, config, execConfig);
+ return createTask(invokable, blobCache, libCache, network, consumableNotifier, partitionProducerStateChecker, executor, config, execConfig);
}
private Task createTask(
Class<? extends AbstractInvokable> invokable,
+ BlobCache blobCache,
LibraryCacheManager libCache,
NetworkEnvironment networkEnvironment,
ResultPartitionConsumableNotifier consumableNotifier,
PartitionProducerStateChecker partitionProducerStateChecker,
Executor executor) throws IOException {
- return createTask(invokable, libCache, networkEnvironment, consumableNotifier, partitionProducerStateChecker, executor, new Configuration(), new ExecutionConfig());
+ return createTask(invokable, blobCache, libCache, networkEnvironment, consumableNotifier, partitionProducerStateChecker, executor, new Configuration(), new ExecutionConfig());
}
private Task createTask(
Class<? extends AbstractInvokable> invokable,
+ BlobCache blobCache,
LibraryCacheManager libCache,
NetworkEnvironment networkEnvironment,
ResultPartitionConsumableNotifier consumableNotifier,
@@ -991,6 +1001,7 @@ public class TaskTest extends TestLogger {
taskManagerConnection,
inputSplitProvider,
checkpointResponder,
+ blobCache,
libCache,
mock(FileCache.class),
new TestingTaskManagerRuntimeInfo(taskManagerConfig),
http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java
new file mode 100644
index 0000000..37c141d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testtasks;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+/**
+ * Task which blocks until the (static) {@link #unblock()} method is called and then fails with an
+ * exception.
+ */
+public class FailingBlockingInvokable extends AbstractInvokable {
+ private static volatile boolean blocking = true;
+ private static final Object lock = new Object();
+
+ @Override
+ public void invoke() throws Exception {
+ while (blocking) {
+ synchronized (lock) {
+ lock.wait();
+ }
+ }
+ throw new RuntimeException("This exception is expected.");
+ }
+
+ public static void unblock() {
+ blocking = false;
+
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index c1df5a3..229f1eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
@@ -178,6 +179,7 @@ public class JvmExitOnFatalErrorTest {
new NoOpTaskManagerActions(),
new NoOpInputSplitProvider(),
new NoOpCheckpointResponder(),
+ mock(BlobCache.class),
new FallbackLibraryCacheManager(),
new FileCache(tmInfo.getTmpDirectories()),
tmInfo,
http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index 1b9ee48..95da981 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -264,14 +264,15 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with Befor
components._1,
components._2,
components._3,
- ActorRef.noSender,
components._4,
+ ActorRef.noSender,
components._5,
+ components._6,
highAvailabilityServices.getJobManagerLeaderElectionService(
HighAvailabilityServices.DEFAULT_JOB_ID),
highAvailabilityServices.getSubmittedJobGraphStore(),
highAvailabilityServices.getCheckpointRecoveryFactory(),
- components._8,
+ components._9,
None)
_system.actorOf(props)
http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index e5655bb..87f8088 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -28,6 +28,7 @@ import akka.testkit.CallingThreadDispatcher
import org.apache.flink.api.common.JobID
import org.apache.flink.configuration.{Configuration, JobManagerOptions}
import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.blob.BlobServer
import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
import org.apache.flink.runtime.checkpoint.{CheckpointOptions, CheckpointRecoveryFactory}
import org.apache.flink.runtime.clusterframework.FlinkResourceManager
@@ -110,6 +111,7 @@ class TestingCluster(
ioExecutor: Executor,
instanceManager: InstanceManager,
scheduler: Scheduler,
+ blobServer: BlobServer,
libraryCacheManager: BlobLibraryCacheManager,
archive: ActorRef,
restartStrategyFactory: RestartStrategyFactory,
@@ -127,6 +129,7 @@ class TestingCluster(
ioExecutor,
instanceManager,
scheduler,
+ blobServer,
libraryCacheManager,
archive,
restartStrategyFactory,
http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index f50a832..8b9ce15 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.{Executor, ScheduledExecutorService}
import akka.actor.ActorRef
import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.blob.BlobServer
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
@@ -34,15 +35,16 @@ import org.apache.flink.runtime.metrics.MetricRegistry
import scala.concurrent.duration._
import scala.language.postfixOps
-/** JobManager implementation extended by testing messages
- *
- */
+/**
+ * JobManager implementation extended by testing messages
+ */
class TestingJobManager(
flinkConfiguration: Configuration,
futureExecutor: ScheduledExecutorService,
ioExecutor: Executor,
instanceManager: InstanceManager,
scheduler: Scheduler,
+ blobServer: BlobServer,
libraryCacheManager: BlobLibraryCacheManager,
archive: ActorRef,
restartStrategyFactory: RestartStrategyFactory,
@@ -58,6 +60,7 @@ class TestingJobManager(
ioExecutor,
instanceManager,
scheduler,
+ blobServer,
libraryCacheManager,
archive,
restartStrategyFactory,
http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
index 3b8178b..82642ea 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -156,6 +157,7 @@ public class BlockingCheckpointsTest {
mock(TaskManagerActions.class),
mock(InputSplitProvider.class),
mock(CheckpointResponder.class),
+ mock(BlobCache.class),
new FallbackLibraryCacheManager(),
new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }),
new TestingTaskManagerRuntimeInfo(),
http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 82e4f31..14ae733 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
@@ -274,6 +275,7 @@ public class InterruptSensitiveRestoreTest {
mock(TaskManagerActions.class),
mock(InputSplitProvider.class),
mock(CheckpointResponder.class),
+ mock(BlobCache.class),
new FallbackLibraryCacheManager(),
new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }),
new TestingTaskManagerRuntimeInfo(),
http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 702d833..79e9583 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -153,6 +154,7 @@ public class StreamTaskTerminationTest extends TestLogger {
mock(TaskManagerActions.class),
mock(InputSplitProvider.class),
mock(CheckpointResponder.class),
+ mock(BlobCache.class),
new FallbackLibraryCacheManager(),
mock(FileCache.class),
taskManagerRuntimeInfo,
http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 09e9a1b..08c3207 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -796,6 +797,7 @@ public class StreamTaskTest extends TestLogger {
StreamConfig taskConfig,
Configuration taskManagerConfig) throws Exception {
+ BlobCache blobCache = mock(BlobCache.class);
LibraryCacheManager libCache = mock(LibraryCacheManager.class);
when(libCache.getClassLoader(any(JobID.class))).thenReturn(StreamTaskTest.class.getClassLoader());
@@ -844,6 +846,7 @@ public class StreamTaskTest extends TestLogger {
mock(TaskManagerActions.class),
mock(InputSplitProvider.class),
mock(CheckpointResponder.class),
+ blobCache,
libCache,
mock(FileCache.class),
new TestingTaskManagerRuntimeInfo(taskManagerConfig, new String[] {System.getProperty("java.io.tmpdir")}),
http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index b539961..bd72d6d 100644
--- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.{Executor, ScheduledExecutorService}
import akka.actor.ActorRef
import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.blob.BlobServer
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
@@ -58,6 +59,7 @@ class TestingYarnJobManager(
ioExecutor: Executor,
instanceManager: InstanceManager,
scheduler: Scheduler,
+ blobServer: BlobServer,
libraryCacheManager: BlobLibraryCacheManager,
archive: ActorRef,
restartStrategyFactory: RestartStrategyFactory,
@@ -73,6 +75,7 @@ class TestingYarnJobManager(
ioExecutor,
instanceManager,
scheduler,
+ blobServer,
libraryCacheManager,
archive,
restartStrategyFactory,
http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index a2d1668..b8dacee 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -24,6 +24,7 @@ import java.util.concurrent.{Executor, ScheduledExecutorService, TimeUnit}
import akka.actor.ActorRef
import org.apache.flink.configuration.{Configuration => FlinkConfiguration}
import org.apache.flink.core.fs.Path
+import org.apache.flink.runtime.blob.BlobServer
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
import org.apache.flink.runtime.clusterframework.ContaineredJobManager
import org.apache.flink.runtime.clusterframework.messages.StopCluster
@@ -49,7 +50,8 @@ import scala.language.postfixOps
* @param instanceManager Instance manager to manage the registered
* [[org.apache.flink.runtime.taskmanager.TaskManager]]
* @param scheduler Scheduler to schedule Flink jobs
- * @param libraryCacheManager Manager to manage uploaded jar files
+ * @param blobServer BLOB store for file uploads
+ * @param libraryCacheManager manages uploaded jar files and class paths
* @param archive Archive for finished Flink jobs
* @param restartStrategyFactory Restart strategy to be used in case of a job recovery
* @param timeout Timeout for futures
@@ -61,6 +63,7 @@ class YarnJobManager(
ioExecutor: Executor,
instanceManager: InstanceManager,
scheduler: FlinkScheduler,
+ blobServer: BlobServer,
libraryCacheManager: BlobLibraryCacheManager,
archive: ActorRef,
restartStrategyFactory: RestartStrategyFactory,
@@ -76,6 +79,7 @@ class YarnJobManager(
ioExecutor,
instanceManager,
scheduler,
+ blobServer,
libraryCacheManager,
archive,
restartStrategyFactory,