You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/01 10:06:45 UTC

[12/16] flink git commit: [FLINK-6655] Add validateAndNormalizeUri method to MemoryArchivist

[FLINK-6655] Add validateAndNormalizeUri method to MemoryArchivist

This closes #4156.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/39562691
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/39562691
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/39562691

Branch: refs/heads/master
Commit: 39562691b160e3061794ac6605b5a9c1f031b548
Parents: 3f0ac26
Author: zhangminglei <zm...@163.com>
Authored: Tue Jun 27 21:26:18 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Sat Jul 1 10:04:05 2017 +0200

----------------------------------------------------------------------
 .../runtime/jobmanager/MemoryArchivist.scala    | 66 ++++++++++++++++----
 1 file changed, 55 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/39562691/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index d83f2cd..327e2a3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -18,13 +18,14 @@
 
 package org.apache.flink.runtime.jobmanager
 
+import java.io.IOException
+import java.net.URI
 import java.util
 
 import akka.actor.ActorRef
 import grizzled.slf4j.Logger
 import org.apache.flink.api.common.JobID
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.core.fs.Path
+import org.apache.flink.core.fs.{FileSystem, Path}
 import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.messages.accumulators._
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils
@@ -34,7 +35,6 @@ import org.apache.flink.runtime.executiongraph.{ArchivedExecutionGraph, Executio
 import org.apache.flink.runtime.history.FsJobArchivist
 import org.apache.flink.runtime.messages.ArchiveMessages._
 import org.apache.flink.runtime.messages.JobManagerMessages._
-import org.apache.flink.runtime.state.filesystem.FsStateBackend
 
 import scala.collection.mutable
 import scala.concurrent.future
@@ -86,7 +86,7 @@ class MemoryArchivist(
   }
 
   override def handleMessage: Receive = {
-    
+
     /* Receive Execution Graph to archive */
     case ArchiveExecutionGraph(jobID, graph) =>
       // Keep lru order in case we override a graph (from multiple job submission in one session).
@@ -109,7 +109,7 @@ class MemoryArchivist(
       trimHistory()
 
     case msg : InfoMessage => handleWebServerInfoMessage(msg, sender())
-      
+
     case RequestArchivedJob(jobID: JobID) =>
       val graph = graphs.get(jobID)
       sender ! decorateMessage(ArchivedJob(graph))
@@ -165,7 +165,7 @@ class MemoryArchivist(
     throw new RuntimeException("Received unknown message " + message)
   }
 
-  
+
   private def handleWebServerInfoMessage(message: InfoMessage, theSender: ActorRef): Unit = {
     message match {
       case _ : RequestJobsOverview =>
@@ -175,7 +175,7 @@ class MemoryArchivist(
         catch {
           case t: Throwable => log.error("Exception while creating the jobs overview", t)
         }
-  
+
       case _ : RequestJobsWithIDsOverview =>
         try {
           sender ! decorateMessage(createJobsWithIDsOverview())
@@ -188,7 +188,7 @@ class MemoryArchivist(
         val details = graphs.values.map {
           v => WebMonitorUtils.createDetailsForJob(v)
         }.toArray[JobDetails]
-        
+
         theSender ! decorateMessage(new MultipleJobsDetails(null, details))
     }
   }
@@ -198,7 +198,7 @@ class MemoryArchivist(
     // so we aren't archiving it yet.
     if (archivePath.isDefined && graph.getState.isGloballyTerminalState) {
       try {
-        val p = FsStateBackend.validateAndNormalizeUri(archivePath.get.toUri)
+        val p = validateAndNormalizeUri(archivePath.get.toUri)
         future {
           try {
             FsJobArchivist.archiveJob(p, graph)
@@ -217,7 +217,7 @@ class MemoryArchivist(
   // --------------------------------------------------------------------------
   //  Request Responses
   // --------------------------------------------------------------------------
-  
+
   private def createJobsOverview() : JobsOverview = {
     new JobsOverview(0, finishedCnt, canceledCnt, failedCnt)
   }
@@ -239,7 +239,7 @@ class MemoryArchivist(
 
     new JobsWithIDsOverview(runningOrPending, finished, canceled, failed)
   }
-  
+
   // --------------------------------------------------------------------------
   //  Utilities
   // --------------------------------------------------------------------------
@@ -255,4 +255,48 @@ class MemoryArchivist(
       graphs.remove(jobID)
     }
   }
+
+  /**
+    * Checks and normalizes the archive path URI. This method first checks the validity of the
+    * URI (scheme, path, availability of a matching file system) and then normalizes the URL
+    * to a path.
+    *
+    * If the URI does not include an authority, but the file system configured for the URI has an
+    * authority, then the normalized path will include this authority.
+    *
+    * @param archivePathUri The URI to check and normalize.
+    * @return a normalized URI as a Path.
+    *
+    * @throws IllegalArgumentException Thrown, if the URI misses schema or path.
+    * @throws IOException Thrown, if no file system can be found for the URI's scheme.
+    */
+  @throws[IOException]
+  private def validateAndNormalizeUri(archivePathUri: URI): Path = {
+    val scheme = archivePathUri.getScheme
+    val path = archivePathUri.getPath
+
+    // some validity checks
+    if (scheme == null) {
+      throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " +
+        "Please specify the file system scheme explicitly in the URI: " + archivePathUri)
+    }
+
+    if (path == null) {
+      throw new IllegalArgumentException("The path to store the job archives is null. " +
+        "Please specify a directory path for storing job archives. and the URI is: " +
+        archivePathUri)
+    }
+
+    if (path.length == 0 || path == "/") {
+      throw new IllegalArgumentException("Cannot use the root directory for storing job archives.")
+    }
+
+    if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme)) {
+      // skip verification checks for non-flink supported filesystem
+      // this is because the required filesystem classes may not be available to the flink client
+      throw new IllegalArgumentException("No file system found with scheme " + scheme
+        + ", referenced in file URI '" + archivePathUri.toString + "'.")
+    }
+    new Path(archivePathUri)
+  }
 }