You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/21 09:44:24 UTC

flink git commit: [FLINK-7262][blob] remove the unused FallbackLibraryCacheManager

Repository: flink
Updated Branches:
  refs/heads/master a1d483179 -> 6c5ecc2f4


[FLINK-7262][blob] remove the unused FallbackLibraryCacheManager

This class was basically only used in unit tests and not really needed there
either. The code path inside TaskManager was also dead.

This closes #4403.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c5ecc2f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c5ecc2f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c5ecc2f

Branch: refs/heads/master
Commit: 6c5ecc2f4a0a6888d8c4517bbe050fb2553faf98
Parents: a1d4831
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Jul 25 14:49:58 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sat Oct 21 11:44:09 2017 +0200

----------------------------------------------------------------------
 .../FallbackLibraryCacheManager.java            | 62 --------------------
 .../flink/runtime/taskmanager/TaskManager.scala | 39 ++++++------
 .../runtime/util/JvmExitOnFatalErrorTest.java   |  9 ++-
 .../runtime/tasks/BlockingCheckpointsTest.java  |  9 ++-
 .../tasks/InterruptSensitiveRestoreTest.java    |  7 ++-
 .../tasks/StreamTaskTerminationTest.java        |  7 ++-
 6 files changed, 39 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6c5ecc2f/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
deleted file mode 100644
index 6564e9e..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.execution.librarycache;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.blob.PermanentBlobKey;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URL;
-import java.util.Collection;
-
-public class FallbackLibraryCacheManager implements LibraryCacheManager {
-
-	private static Logger LOG = LoggerFactory.getLogger(FallbackLibraryCacheManager.class);
-
-	@Override
-	public ClassLoader getClassLoader(JobID id) {
-		return getClass().getClassLoader();
-	}
-
-	@Override
-	public void registerJob(JobID id, Collection<PermanentBlobKey> requiredJarFiles, Collection<URL> requiredClasspaths) {
-		LOG.warn("FallbackLibraryCacheManager cannot download files associated with blob keys.");
-	}
-
-	@Override
-	public void registerTask(JobID id, ExecutionAttemptID execution, Collection<PermanentBlobKey> requiredJarFiles,
-		Collection<URL> requiredClasspaths) {
-		LOG.warn("FallbackLibraryCacheManager cannot download files associated with blob keys.");
-	}
-
-	@Override
-	public void unregisterJob(JobID id) {
-		LOG.warn("FallbackLibraryCacheManager does not book keeping of job IDs.");
-	}
-
-	@Override
-	public void unregisterTask(JobID id, ExecutionAttemptID execution) {
-		LOG.warn("FallbackLibraryCacheManager does not book keeping of job IDs.");
-	}
-
-	@Override
-	public void shutdown() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6c5ecc2f/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index c370725..0f79d51 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -42,7 +42,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.concurrent.{Executors, FutureUtils}
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor
 import org.apache.flink.runtime.execution.ExecutionState
-import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, FallbackLibraryCacheManager, LibraryCacheManager}
+import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager}
 import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, PartitionInfo}
 import org.apache.flink.runtime.filecache.FileCache
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
@@ -955,31 +955,26 @@ class TaskManager(
     }
 
     // start a blob service, if a blob server is specified
-    if (blobPort > 0) {
-      val jmHost = jobManager.path.address.host.getOrElse("localhost")
-      val address = new InetSocketAddress(jmHost, blobPort)
+    val jmHost = jobManager.path.address.host.getOrElse("localhost")
+    val address = new InetSocketAddress(jmHost, blobPort)
 
-      log.info(s"Determined BLOB server address to be $address. Starting BLOB cache.")
+    log.info(s"Determined BLOB server address to be $address. Starting BLOB cache.")
 
-      try {
-        val blobcache = new BlobCacheService(
-          address,
-          config.getConfiguration(),
-          highAvailabilityServices.createBlobStore())
-        blobCache = Option(blobcache)
-        libraryCacheManager = Some(
-          new BlobLibraryCacheManager(
+    try {
+      val blobcache = new BlobCacheService(
+        address,
+        config.getConfiguration(),
+        highAvailabilityServices.createBlobStore())
+      blobCache = Option(blobcache)
+      libraryCacheManager = Some(
+        new BlobLibraryCacheManager(
             blobcache.getPermanentBlobService, config.getClassLoaderResolveOrder()))
-      }
-      catch {
-        case e: Exception =>
-          val message = "Could not create BLOB cache or library cache."
-          log.error(message, e)
-          throw new RuntimeException(message, e)
-      }
     }
-    else {
-      libraryCacheManager = Some(new FallbackLibraryCacheManager)
+    catch {
+      case e: Exception =>
+        val message = "Could not create BLOB cache or library cache."
+        log.error(message, e)
+        throw new RuntimeException(message, e)
     }
     
     taskManagerMetricGroup = 

http://git-wip-us.apache.org/repos/asf/flink/blob/6c5ecc2f/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index 57a3831..82b6b29 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -34,7 +34,8 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
@@ -182,8 +183,10 @@ public class JvmExitOnFatalErrorTest {
 						new NoOpTaskManagerActions(),
 						new NoOpInputSplitProvider(),
 						new NoOpCheckpointResponder(),
-					blobService,
-						new FallbackLibraryCacheManager(),
+						blobService,
+						new BlobLibraryCacheManager(
+							blobService.getPermanentBlobService(),
+							FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST),
 						new FileCache(tmInfo.getTmpDirectories()),
 						tmInfo,
 						new UnregisteredTaskMetricsGroup(),

http://git-wip-us.apache.org/repos/asf/flink/blob/6c5ecc2f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
index b43f30a..5a38615 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -37,7 +37,8 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
@@ -160,8 +161,10 @@ public class BlockingCheckpointsTest {
 				mock(TaskManagerActions.class),
 				mock(InputSplitProvider.class),
 				mock(CheckpointResponder.class),
-			blobService,
-				new FallbackLibraryCacheManager(),
+				blobService,
+				new BlobLibraryCacheManager(
+					blobService.getPermanentBlobService(),
+					FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST),
 				new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }),
 				new TestingTaskManagerRuntimeInfo(),
 				new UnregisteredTaskMetricsGroup(),

http://git-wip-us.apache.org/repos/asf/flink/blob/6c5ecc2f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 19823ec..d049e31 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -34,7 +34,8 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
@@ -262,7 +263,9 @@ public class InterruptSensitiveRestoreTest {
 			mock(InputSplitProvider.class),
 			mock(CheckpointResponder.class),
 			blobService,
-			new FallbackLibraryCacheManager(),
+			new BlobLibraryCacheManager(
+				blobService.getPermanentBlobService(),
+				FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST),
 			new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }),
 			new TestingTaskManagerRuntimeInfo(),
 			new UnregisteredTaskMetricsGroup(),

http://git-wip-us.apache.org/repos/asf/flink/blob/6c5ecc2f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index a989fa4..7eb2655 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -34,7 +34,8 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
@@ -158,7 +159,9 @@ public class StreamTaskTerminationTest extends TestLogger {
 			mock(InputSplitProvider.class),
 			mock(CheckpointResponder.class),
 			blobService,
-			new FallbackLibraryCacheManager(),
+			new BlobLibraryCacheManager(
+				blobService.getPermanentBlobService(),
+				FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST),
 			mock(FileCache.class),
 			taskManagerRuntimeInfo,
 			new UnregisteredTaskMetricsGroup(),