You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/24 19:47:18 UTC
[flink] 03/06: [hotfix] Move BlobWriter into JobMangerSharedServices
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 35a3d7918d9d19ca9f718137179e6391854dc84a
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Jan 18 12:03:05 2019 +0100
[hotfix] Move BlobWriter into JobMangerSharedServices
---
.../apache/flink/runtime/dispatcher/Dispatcher.java | 4 ----
.../flink/runtime/jobmaster/JobManagerRunner.java | 3 ---
.../runtime/jobmaster/JobManagerSharedServices.java | 18 ++++++++++++++++--
.../org/apache/flink/runtime/jobmaster/JobMaster.java | 3 +--
.../flink/runtime/dispatcher/DispatcherTest.java | 6 ++----
.../dispatcher/TestingJobManagerRunnerFactory.java | 2 --
.../flink/runtime/jobmaster/JobManagerRunnerTest.java | 5 ++---
.../apache/flink/runtime/jobmaster/JobMasterTest.java | 14 --------------
.../TestingJobManagerSharedServicesBuilder.java | 12 +++++++++++-
9 files changed, 32 insertions(+), 35 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 2330180..544a438 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -314,7 +314,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
rpcService,
highAvailabilityServices,
heartbeatServices,
- blobServer,
jobManagerSharedServices,
new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
fatalErrorHandler)),
@@ -1024,7 +1023,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
- BlobServer blobServer,
JobManagerSharedServices jobManagerServices,
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler) throws Exception;
@@ -1044,7 +1042,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
- BlobServer blobServer,
JobManagerSharedServices jobManagerServices,
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler) throws Exception {
@@ -1055,7 +1052,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
rpcService,
highAvailabilityServices,
heartbeatServices,
- blobServer,
jobManagerServices,
jobManagerJobMetricGroupFactory,
fatalErrorHandler);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 300462a..c10a535 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmaster;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
@@ -109,7 +108,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
final RpcService rpcService,
final HighAvailabilityServices haServices,
final HeartbeatServices heartbeatServices,
- final BlobServer blobServer,
final JobManagerSharedServices jobManagerSharedServices,
final JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
final FatalErrorHandler fatalErrorHandler) throws Exception {
@@ -163,7 +161,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
slotPoolFactory,
jobManagerSharedServices,
heartbeatServices,
- blobServer,
jobManagerJobMetricGroupFactory,
this,
fatalErrorHandler,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
index c1e910c..6c6696f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
@@ -36,6 +37,8 @@ import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.util.ExceptionUtils;
+import javax.annotation.Nonnull;
+
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -60,18 +63,23 @@ public class JobManagerSharedServices {
private final BackPressureStatsTracker backPressureStatsTracker;
+ @Nonnull
+ private final BlobWriter blobWriter;
+
public JobManagerSharedServices(
ScheduledExecutorService scheduledExecutorService,
LibraryCacheManager libraryCacheManager,
RestartStrategyFactory restartStrategyFactory,
StackTraceSampleCoordinator stackTraceSampleCoordinator,
- BackPressureStatsTracker backPressureStatsTracker) {
+ BackPressureStatsTracker backPressureStatsTracker,
+ @Nonnull BlobWriter blobWriter) {
this.scheduledExecutorService = checkNotNull(scheduledExecutorService);
this.libraryCacheManager = checkNotNull(libraryCacheManager);
this.restartStrategyFactory = checkNotNull(restartStrategyFactory);
this.stackTraceSampleCoordinator = checkNotNull(stackTraceSampleCoordinator);
this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker);
+ this.blobWriter = blobWriter;
}
public ScheduledExecutorService getScheduledExecutorService() {
@@ -90,6 +98,11 @@ public class JobManagerSharedServices {
return backPressureStatsTracker;
}
+ @Nonnull
+ public BlobWriter getBlobWriter() {
+ return blobWriter;
+ }
+
/**
* Shutdown the {@link JobMaster} services.
*
@@ -171,6 +184,7 @@ public class JobManagerSharedServices {
libraryCacheManager,
RestartStrategyFactory.createRestartStrategyFactory(config),
stackTraceSampleCoordinator,
- backPressureStatsTracker);
+ backPressureStatsTracker,
+ blobServer);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 5a5ca19..90b8b84 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -230,7 +230,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
SlotPoolFactory slotPoolFactory,
JobManagerSharedServices jobManagerSharedServices,
HeartbeatServices heartbeatServices,
- BlobWriter blobWriter,
JobManagerJobMetricGroupFactory jobMetricGroupFactory,
OnCompletionActions jobCompletionActions,
FatalErrorHandler fatalErrorHandler,
@@ -245,7 +244,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
this.jobGraph = checkNotNull(jobGraph);
this.rpcTimeout = jobMasterConfiguration.getRpcTimeout();
this.highAvailabilityServices = checkNotNull(highAvailabilityService);
- this.blobWriter = checkNotNull(blobWriter);
+ this.blobWriter = jobManagerSharedServices.getBlobWriter();
this.scheduledExecutorService = jobManagerSharedServices.getScheduledExecutorService();
this.jobCompletionActions = checkNotNull(jobCompletionActions);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 677ce5a..e10b8bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -697,10 +697,10 @@ public class DispatcherTest extends TestLogger {
}
@Override
- public JobManagerRunner createJobManagerRunner(ResourceID resourceId, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
+ public JobManagerRunner createJobManagerRunner(ResourceID resourceId, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
jobManagerRunnerCreationLatch.run();
- return super.createJobManagerRunner(resourceId, jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, blobServer, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler);
+ return super.createJobManagerRunner(resourceId, jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler);
}
}
@@ -754,7 +754,6 @@ public class DispatcherTest extends TestLogger {
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
- BlobServer blobServer,
JobManagerSharedServices jobManagerSharedServices,
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler) throws Exception {
@@ -769,7 +768,6 @@ public class DispatcherTest extends TestLogger {
rpcService,
highAvailabilityServices,
heartbeatServices,
- blobServer,
jobManagerSharedServices,
jobManagerJobMetricGroupFactory,
fatalErrorHandler);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
index 63574cf..30e4af6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.dispatcher;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -75,7 +74,6 @@ class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFacto
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
- BlobServer blobServer,
JobManagerSharedServices jobManagerSharedServices,
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler) throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
index 08f6fe5..bf2576b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
@@ -61,7 +61,7 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
/**
- * Tests for the {@link JobManagerRunner}
+ * Tests for the {@link JobManagerRunner}.
*/
public class JobManagerRunnerTest extends TestLogger {
@@ -138,7 +138,7 @@ public class JobManagerRunnerTest extends TestLogger {
rpcService.stopService();
}
}
-
+
@Test
public void testJobCompletion() throws Exception {
final JobManagerRunner jobManagerRunner = createJobManagerRunner();
@@ -235,7 +235,6 @@ public class JobManagerRunnerTest extends TestLogger {
rpcService,
haServices,
heartbeatServices,
- blobServer,
jobManagerSharedServices,
UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
fatalErrorHandler);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 56d5044..9c51eab 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -34,8 +34,6 @@ import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.Checkpoints;
@@ -193,8 +191,6 @@ public class JobMasterTest extends TestLogger {
private static HeartbeatServices heartbeatServices;
- private BlobServer blobServer;
-
private Configuration configuration;
private ResourceID jmResourceId;
@@ -232,9 +228,6 @@ public class JobMasterTest extends TestLogger {
haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
- blobServer = new BlobServer(configuration, new VoidBlobStore());
-
- blobServer.start();
}
@After
@@ -243,10 +236,6 @@ public class JobMasterTest extends TestLogger {
testingFatalErrorHandler.rethrowError();
}
- if (blobServer != null) {
- blobServer.close();
- }
-
rpcService.clearGateways();
}
@@ -283,7 +272,6 @@ public class JobMasterTest extends TestLogger {
DefaultSlotPoolFactory.fromConfiguration(configuration, rpcService1),
jobManagerSharedServices,
heartbeatServices,
- blobServer,
UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
new NoOpOnCompletionActions(),
testingFatalErrorHandler,
@@ -1339,7 +1327,6 @@ public class JobMasterTest extends TestLogger {
DefaultSlotPoolFactory.fromConfiguration(configuration, rpcService),
new TestingJobManagerSharedServicesBuilder().build(),
heartbeatServices,
- blobServer,
UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
new NoOpOnCompletionActions(),
testingFatalErrorHandler,
@@ -1553,7 +1540,6 @@ public class JobMasterTest extends TestLogger {
DefaultSlotPoolFactory.fromConfiguration(configuration, rpcService),
jobManagerSharedServices,
heartbeatServices,
- blobServer,
UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
new NoOpOnCompletionActions(),
testingFatalErrorHandler,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
index 030e4e6..ae20d1e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.jobmaster;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.restart.NoOrFixedIfCheckpointingEnabledRestartStrategyFactory;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
@@ -45,12 +47,15 @@ public class TestingJobManagerSharedServicesBuilder {
private BackPressureStatsTracker backPressureStatsTracker;
+ private BlobWriter blobWriter;
+
public TestingJobManagerSharedServicesBuilder() {
scheduledExecutorService = TestingUtils.defaultExecutor();
libraryCacheManager = mock(LibraryCacheManager.class);
restartStrategyFactory = new NoOrFixedIfCheckpointingEnabledRestartStrategyFactory();
stackTraceSampleCoordinator = mock(StackTraceSampleCoordinator.class);
backPressureStatsTracker = VoidBackPressureStatsTracker.INSTANCE;
+ blobWriter = VoidBlobWriter.getInstance();
}
public TestingJobManagerSharedServicesBuilder setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
@@ -80,12 +85,17 @@ public class TestingJobManagerSharedServicesBuilder {
}
+ public void setBlobWriter(BlobWriter blobWriter) {
+ this.blobWriter = blobWriter;
+ }
+
public JobManagerSharedServices build() {
return new JobManagerSharedServices(
scheduledExecutorService,
libraryCacheManager,
restartStrategyFactory,
stackTraceSampleCoordinator,
- backPressureStatsTracker);
+ backPressureStatsTracker,
+ blobWriter);
}
}