You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/24 19:47:18 UTC

[flink] 03/06: [hotfix] Move BlobWriter into JobMangerSharedServices

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 35a3d7918d9d19ca9f718137179e6391854dc84a
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Jan 18 12:03:05 2019 +0100

    [hotfix] Move BlobWriter into JobMangerSharedServices
---
 .../apache/flink/runtime/dispatcher/Dispatcher.java    |  4 ----
 .../flink/runtime/jobmaster/JobManagerRunner.java      |  3 ---
 .../runtime/jobmaster/JobManagerSharedServices.java    | 18 ++++++++++++++++--
 .../org/apache/flink/runtime/jobmaster/JobMaster.java  |  3 +--
 .../flink/runtime/dispatcher/DispatcherTest.java       |  6 ++----
 .../dispatcher/TestingJobManagerRunnerFactory.java     |  2 --
 .../flink/runtime/jobmaster/JobManagerRunnerTest.java  |  5 ++---
 .../apache/flink/runtime/jobmaster/JobMasterTest.java  | 14 --------------
 .../TestingJobManagerSharedServicesBuilder.java        | 12 +++++++++++-
 9 files changed, 32 insertions(+), 35 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 2330180..544a438 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -314,7 +314,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 					rpcService,
 					highAvailabilityServices,
 					heartbeatServices,
-					blobServer,
 					jobManagerSharedServices,
 					new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
 					fatalErrorHandler)),
@@ -1024,7 +1023,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 			RpcService rpcService,
 			HighAvailabilityServices highAvailabilityServices,
 			HeartbeatServices heartbeatServices,
-			BlobServer blobServer,
 			JobManagerSharedServices jobManagerServices,
 			JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
 			FatalErrorHandler fatalErrorHandler) throws Exception;
@@ -1044,7 +1042,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 				RpcService rpcService,
 				HighAvailabilityServices highAvailabilityServices,
 				HeartbeatServices heartbeatServices,
-				BlobServer blobServer,
 				JobManagerSharedServices jobManagerServices,
 				JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
 				FatalErrorHandler fatalErrorHandler) throws Exception {
@@ -1055,7 +1052,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 				rpcService,
 				highAvailabilityServices,
 				heartbeatServices,
-				blobServer,
 				jobManagerServices,
 				jobManagerJobMetricGroupFactory,
 				fatalErrorHandler);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 300462a..c10a535 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
@@ -109,7 +108,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 			final RpcService rpcService,
 			final HighAvailabilityServices haServices,
 			final HeartbeatServices heartbeatServices,
-			final BlobServer blobServer,
 			final JobManagerSharedServices jobManagerSharedServices,
 			final JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
 			final FatalErrorHandler fatalErrorHandler) throws Exception {
@@ -163,7 +161,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 				slotPoolFactory,
 				jobManagerSharedServices,
 				heartbeatServices,
-				blobServer,
 				jobManagerJobMetricGroupFactory,
 				this,
 				fatalErrorHandler,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
index c1e910c..6c6696f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
@@ -36,6 +37,8 @@ import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.util.ExceptionUtils;
 
+import javax.annotation.Nonnull;
+
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -60,18 +63,23 @@ public class JobManagerSharedServices {
 
 	private final BackPressureStatsTracker backPressureStatsTracker;
 
+	@Nonnull
+	private final BlobWriter blobWriter;
+
 	public JobManagerSharedServices(
 			ScheduledExecutorService scheduledExecutorService,
 			LibraryCacheManager libraryCacheManager,
 			RestartStrategyFactory restartStrategyFactory,
 			StackTraceSampleCoordinator stackTraceSampleCoordinator,
-			BackPressureStatsTracker backPressureStatsTracker) {
+			BackPressureStatsTracker backPressureStatsTracker,
+			@Nonnull BlobWriter blobWriter) {
 
 		this.scheduledExecutorService = checkNotNull(scheduledExecutorService);
 		this.libraryCacheManager = checkNotNull(libraryCacheManager);
 		this.restartStrategyFactory = checkNotNull(restartStrategyFactory);
 		this.stackTraceSampleCoordinator = checkNotNull(stackTraceSampleCoordinator);
 		this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker);
+		this.blobWriter = blobWriter;
 	}
 
 	public ScheduledExecutorService getScheduledExecutorService() {
@@ -90,6 +98,11 @@ public class JobManagerSharedServices {
 		return backPressureStatsTracker;
 	}
 
+	@Nonnull
+	public BlobWriter getBlobWriter() {
+		return blobWriter;
+	}
+
 	/**
 	 * Shutdown the {@link JobMaster} services.
 	 *
@@ -171,6 +184,7 @@ public class JobManagerSharedServices {
 			libraryCacheManager,
 			RestartStrategyFactory.createRestartStrategyFactory(config),
 			stackTraceSampleCoordinator,
-			backPressureStatsTracker);
+			backPressureStatsTracker,
+			blobServer);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 5a5ca19..90b8b84 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -230,7 +230,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			SlotPoolFactory slotPoolFactory,
 			JobManagerSharedServices jobManagerSharedServices,
 			HeartbeatServices heartbeatServices,
-			BlobWriter blobWriter,
 			JobManagerJobMetricGroupFactory jobMetricGroupFactory,
 			OnCompletionActions jobCompletionActions,
 			FatalErrorHandler fatalErrorHandler,
@@ -245,7 +244,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		this.jobGraph = checkNotNull(jobGraph);
 		this.rpcTimeout = jobMasterConfiguration.getRpcTimeout();
 		this.highAvailabilityServices = checkNotNull(highAvailabilityService);
-		this.blobWriter = checkNotNull(blobWriter);
+		this.blobWriter = jobManagerSharedServices.getBlobWriter();
 		this.scheduledExecutorService = jobManagerSharedServices.getScheduledExecutorService();
 		this.jobCompletionActions = checkNotNull(jobCompletionActions);
 		this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 677ce5a..e10b8bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -697,10 +697,10 @@ public class DispatcherTest extends TestLogger {
 		}
 
 		@Override
-		public JobManagerRunner createJobManagerRunner(ResourceID resourceId, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
+		public JobManagerRunner createJobManagerRunner(ResourceID resourceId, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
 			jobManagerRunnerCreationLatch.run();
 
-			return super.createJobManagerRunner(resourceId, jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, blobServer, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler);
+			return super.createJobManagerRunner(resourceId, jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler);
 		}
 	}
 
@@ -754,7 +754,6 @@ public class DispatcherTest extends TestLogger {
 				RpcService rpcService,
 				HighAvailabilityServices highAvailabilityServices,
 				HeartbeatServices heartbeatServices,
-				BlobServer blobServer,
 				JobManagerSharedServices jobManagerSharedServices,
 				JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
 				FatalErrorHandler fatalErrorHandler) throws Exception {
@@ -769,7 +768,6 @@ public class DispatcherTest extends TestLogger {
 				rpcService,
 				highAvailabilityServices,
 				heartbeatServices,
-				blobServer,
 				jobManagerSharedServices,
 				jobManagerJobMetricGroupFactory,
 				fatalErrorHandler);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
index 63574cf..30e4af6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -75,7 +74,6 @@ class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFacto
 			RpcService rpcService,
 			HighAvailabilityServices highAvailabilityServices,
 			HeartbeatServices heartbeatServices,
-			BlobServer blobServer,
 			JobManagerSharedServices jobManagerSharedServices,
 			JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
 			FatalErrorHandler fatalErrorHandler) throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
index 08f6fe5..bf2576b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
@@ -61,7 +61,7 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 /**
- * Tests for the {@link JobManagerRunner}
+ * Tests for the {@link JobManagerRunner}.
  */
 public class JobManagerRunnerTest extends TestLogger {
 
@@ -138,7 +138,7 @@ public class JobManagerRunnerTest extends TestLogger {
 			rpcService.stopService();
 		}
 	}
-	
+
 	@Test
 	public void testJobCompletion() throws Exception {
 		final JobManagerRunner jobManagerRunner = createJobManagerRunner();
@@ -235,7 +235,6 @@ public class JobManagerRunnerTest extends TestLogger {
 			rpcService,
 			haServices,
 			heartbeatServices,
-			blobServer,
 			jobManagerSharedServices,
 			UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
 			fatalErrorHandler);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 56d5044..9c51eab 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -34,8 +34,6 @@ import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
@@ -193,8 +191,6 @@ public class JobMasterTest extends TestLogger {
 
 	private static HeartbeatServices heartbeatServices;
 
-	private BlobServer blobServer;
-
 	private Configuration configuration;
 
 	private ResourceID jmResourceId;
@@ -232,9 +228,6 @@ public class JobMasterTest extends TestLogger {
 		haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
 
 		configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
-		blobServer = new BlobServer(configuration, new VoidBlobStore());
-
-		blobServer.start();
 	}
 
 	@After
@@ -243,10 +236,6 @@ public class JobMasterTest extends TestLogger {
 			testingFatalErrorHandler.rethrowError();
 		}
 
-		if (blobServer != null) {
-			blobServer.close();
-		}
-
 		rpcService.clearGateways();
 	}
 
@@ -283,7 +272,6 @@ public class JobMasterTest extends TestLogger {
 				DefaultSlotPoolFactory.fromConfiguration(configuration, rpcService1),
 				jobManagerSharedServices,
 				heartbeatServices,
-				blobServer,
 				UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
 				new NoOpOnCompletionActions(),
 				testingFatalErrorHandler,
@@ -1339,7 +1327,6 @@ public class JobMasterTest extends TestLogger {
 			DefaultSlotPoolFactory.fromConfiguration(configuration, rpcService),
 			new TestingJobManagerSharedServicesBuilder().build(),
 			heartbeatServices,
-			blobServer,
 			UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
 			new NoOpOnCompletionActions(),
 			testingFatalErrorHandler,
@@ -1553,7 +1540,6 @@ public class JobMasterTest extends TestLogger {
 			DefaultSlotPoolFactory.fromConfiguration(configuration, rpcService),
 			jobManagerSharedServices,
 			heartbeatServices,
-			blobServer,
 			UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
 			new NoOpOnCompletionActions(),
 			testingFatalErrorHandler,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
index 030e4e6..ae20d1e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.jobmaster;
 
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.restart.NoOrFixedIfCheckpointingEnabledRestartStrategyFactory;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
@@ -45,12 +47,15 @@ public class TestingJobManagerSharedServicesBuilder {
 
 	private BackPressureStatsTracker backPressureStatsTracker;
 
+	private BlobWriter blobWriter;
+
 	public TestingJobManagerSharedServicesBuilder() {
 		scheduledExecutorService = TestingUtils.defaultExecutor();
 		libraryCacheManager = mock(LibraryCacheManager.class);
 		restartStrategyFactory = new NoOrFixedIfCheckpointingEnabledRestartStrategyFactory();
 		stackTraceSampleCoordinator = mock(StackTraceSampleCoordinator.class);
 		backPressureStatsTracker = VoidBackPressureStatsTracker.INSTANCE;
+		blobWriter = VoidBlobWriter.getInstance();
 	}
 
 	public TestingJobManagerSharedServicesBuilder setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
@@ -80,12 +85,17 @@ public class TestingJobManagerSharedServicesBuilder {
 
 	}
 
+	public void setBlobWriter(BlobWriter blobWriter) {
+		this.blobWriter = blobWriter;
+	}
+
 	public JobManagerSharedServices build() {
 		return new JobManagerSharedServices(
 			scheduledExecutorService,
 			libraryCacheManager,
 			restartStrategyFactory,
 			stackTraceSampleCoordinator,
-			backPressureStatsTracker);
+			backPressureStatsTracker,
+			blobWriter);
 	}
 }