You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/09/17 02:09:38 UTC
[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5038]. Use
zeppelin's own dependency resolver in flink interpreter
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 7a72105 [ZEPPELIN-5038]. Use zeppelin's own dependency resolver in flink interpreter
7a72105 is described below
commit 7a721052866590082b7120409aa95bf7c88ca3a6
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Sun Sep 13 22:03:08 2020 +0800
[ZEPPELIN-5038]. Use zeppelin's own dependency resolver in flink interpreter
### What is this PR for?
This PR use the zeppelin's own dependency resolver instead of its own dependency resolver. The benefit is following:
1. Remove code duplication
2. Leverage the capability of adding maven repository
### What type of PR is it?
[ Improvement]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5038
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3911 from zjffdu/ZEPPELIN-5038 and squashes the following commits:
a4df83828 [Jeff Zhang] [ZEPPELIN-5038]. Use zeppelin's own dependency resolver in flink interpreter
(cherry picked from commit 02935f35fab2944660b116a0d873150b1b7ddd26)
Signed-off-by: Jeff Zhang <zj...@apache.org>
---
.../zeppelin/flink/FlinkScalaInterpreter.scala | 11 +-
.../zeppelin/flink/util/DependencyUtils.scala | 381 ---------------------
2 files changed, 8 insertions(+), 384 deletions(-)
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 3f9f03a..000313e 100644
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -18,7 +18,7 @@
package org.apache.zeppelin.flink
-import java.io.{BufferedReader, File, IOException}
+import java.io.{BufferedReader, File}
import java.net.{URL, URLClassLoader}
import java.nio.file.Files
import java.util.Properties
@@ -46,13 +46,14 @@ import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, Tabl
import org.apache.flink.table.module.ModuleManager
import org.apache.flink.table.module.hive.HiveModule
import org.apache.flink.yarn.cli.FlinkYarnSessionCli
-import org.apache.zeppelin.flink.util.DependencyUtils
+import org.apache.zeppelin.dep.DependencyResolver
import org.apache.zeppelin.flink.FlinkShell._
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterException, InterpreterHookRegistry, InterpreterResult}
import org.slf4j.{Logger, LoggerFactory}
+import scala.collection.{JavaConversions, JavaConverters}
import scala.collection.JavaConverters._
import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter.Completion.ScalaCompleter
@@ -787,7 +788,11 @@ class FlinkScalaInterpreter(val properties: Properties) {
val flinkPackageJars =
if (!StringUtils.isBlank(properties.getProperty("flink.execution.packages", ""))) {
val packages = properties.getProperty("flink.execution.packages")
- DependencyUtils.resolveMavenDependencies(null, packages, null, null, None).split(":").toSeq
+ val dependencyDir = Files.createTempDirectory("zeppelin-flink-dep").toFile
+ val dependencyResolver = new DependencyResolver(dependencyDir.getAbsolutePath)
+ packages.split(",")
+ .flatMap(e => JavaConversions.asScalaBuffer(dependencyResolver.load(e, dependencyDir)))
+ .map(e => e.getAbsolutePath).toSeq
} else {
Seq.empty[String]
}
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/util/DependencyUtils.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/util/DependencyUtils.scala
deleted file mode 100644
index a3303c9..0000000
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/util/DependencyUtils.scala
+++ /dev/null
@@ -1,381 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.flink.util
-
-import java.io.{File, IOException}
-import java.text.ParseException
-import java.util.UUID
-
-import org.apache.commons.lang3.StringUtils
-import org.apache.ivy.Ivy
-import org.apache.ivy.core.LogOptions
-import org.apache.ivy.core.module.descriptor._
-import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId}
-import org.apache.ivy.core.report.ResolveReport
-import org.apache.ivy.core.resolve.ResolveOptions
-import org.apache.ivy.core.retrieve.RetrieveOptions
-import org.apache.ivy.core.settings.IvySettings
-import org.apache.ivy.plugins.matcher.GlobPatternMatcher
-import org.apache.ivy.plugins.repository.file.FileRepository
-import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBiblioResolver}
-
-object DependencyUtils {
-
- def resolveMavenDependencies(
- packagesExclusions: String,
- packages: String,
- repositories: String,
- ivyRepoPath: String,
- ivySettingsPath: Option[String]): String = {
- val exclusions: Seq[String] =
- if (!StringUtils.isBlank(packagesExclusions)) {
- packagesExclusions.split(",")
- } else {
- Nil
- }
- // Create the IvySettings, either load from file or build defaults
- val ivySettings = ivySettingsPath match {
- case Some(path) =>
- loadIvySettings(path, Option(repositories), Option(ivyRepoPath))
-
- case None =>
- buildIvySettings(Option(repositories), Option(ivyRepoPath))
- }
-
- resolveMavenCoordinates(packages, ivySettings, exclusions = exclusions)
- }
-
- // Exposed for testing
- var printStream = Console.out
-
- /**
- * Represents a Maven Coordinate
- *
- * @param groupId the groupId of the coordinate
- * @param artifactId the artifactId of the coordinate
- * @param version the version of the coordinate
- */
- case class MavenCoordinate(groupId: String, artifactId: String, version: String) {
- override def toString: String = s"$groupId:$artifactId:$version"
- }
-
- /**
- * Extracts maven coordinates from a comma-delimited string. Coordinates should be provided
- * in the format `groupId:artifactId:version` or `groupId/artifactId:version`.
- *
- * @param coordinates Comma-delimited string of maven coordinates
- * @return Sequence of Maven coordinates
- */
- def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = {
- coordinates.split(",").map { p =>
- val splits = p.replace("/", ":").split(":")
- require(splits.length == 3, s"Provided Maven Coordinates must be in the form " +
- s"'groupId:artifactId:version'. The coordinate provided is: $p")
- require(splits(0) != null && splits(0).trim.nonEmpty, s"The groupId cannot be null or " +
- s"be whitespace. The groupId provided is: ${splits(0)}")
- require(splits(1) != null && splits(1).trim.nonEmpty, s"The artifactId cannot be null or " +
- s"be whitespace. The artifactId provided is: ${splits(1)}")
- require(splits(2) != null && splits(2).trim.nonEmpty, s"The version cannot be null or " +
- s"be whitespace. The version provided is: ${splits(2)}")
- new MavenCoordinate(splits(0), splits(1), splits(2))
- }
- }
-
- /** Path of the local Maven cache. */
- private def m2Path: File = {
- new File(System.getProperty("user.home"), ".m2" + File.separator + "repository")
- }
-
- /**
- * Extracts maven coordinates from a comma-delimited string
- *
- * @param defaultIvyUserDir The default user path for Ivy
- * @return A ChainResolver used by Ivy to search for and resolve dependencies.
- */
- def createRepoResolvers(defaultIvyUserDir: File): ChainResolver = {
- // We need a chain resolver if we want to check multiple repositories
- val cr = new ChainResolver
- cr.setName("flink-list")
-
- val localM2 = new IBiblioResolver
- localM2.setM2compatible(true)
- localM2.setRoot(m2Path.toURI.toString)
- localM2.setUsepoms(true)
- localM2.setName("local-m2-cache")
- cr.add(localM2)
-
- val localIvy = new FileSystemResolver
- val localIvyRoot = new File(defaultIvyUserDir, "local")
- localIvy.setLocal(true)
- localIvy.setRepository(new FileRepository(localIvyRoot))
- val ivyPattern = Seq(localIvyRoot.getAbsolutePath, "[organisation]", "[module]", "[revision]",
- "ivys", "ivy.xml").mkString(File.separator)
- localIvy.addIvyPattern(ivyPattern)
- val artifactPattern = Seq(localIvyRoot.getAbsolutePath, "[organisation]", "[module]",
- "[revision]", "[type]s", "[artifact](-[classifier]).[ext]").mkString(File.separator)
- localIvy.addArtifactPattern(artifactPattern)
- localIvy.setName("local-ivy-cache")
- cr.add(localIvy)
-
- // the biblio resolver resolves POM declared dependencies
- val br: IBiblioResolver = new IBiblioResolver
- br.setM2compatible(true)
- br.setUsepoms(true)
- br.setName("central")
- cr.add(br)
-
- cr
- }
-
- /**
- * Output a comma-delimited list of paths for the downloaded jars to be added to the classpath
- *
- * @param artifacts Sequence of dependencies that were resolved and retrieved
- * @param cacheDirectory directory where jars are cached
- * @return a comma-delimited list of paths for the dependencies
- */
- def resolveDependencyPaths(
- artifacts: Array[AnyRef],
- cacheDirectory: File): String = {
- artifacts.map { artifactInfo =>
- val artifact = artifactInfo.asInstanceOf[Artifact].getModuleRevisionId
- cacheDirectory.getAbsolutePath + File.separator +
- s"${artifact.getOrganisation}_${artifact.getName}-${artifact.getRevision}.jar"
- }.mkString(":")
- }
-
- /** Adds the given maven coordinates to Ivy's module descriptor. */
- def addDependenciesToIvy(
- md: DefaultModuleDescriptor,
- artifacts: Seq[MavenCoordinate],
- ivyConfName: String): Unit = {
- artifacts.foreach { mvn =>
- val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, mvn.version)
- val dd = new DefaultDependencyDescriptor(ri, false, false)
- dd.addDependencyConfiguration(ivyConfName, ivyConfName + "(runtime)")
- // scalastyle:off println
- printStream.println(s"${dd.getDependencyId} added as a dependency")
- // scalastyle:on println
- md.addDependency(dd)
- }
- }
-
- /** Add exclusion rules for dependencies already included in the flink-dist */
- def addExclusionRules(
- ivySettings: IvySettings,
- ivyConfName: String,
- md: DefaultModuleDescriptor): Unit = {
- // Add scala exclusion rule
- md.addExcludeRule(createExclusion("*:scala-library:*", ivySettings, ivyConfName))
- }
-
- /**
- * Build Ivy Settings using options with default resolvers
- *
- * @param remoteRepos Comma-delimited string of remote repositories other than maven central
- * @param ivyPath The path to the local ivy repository
- * @return An IvySettings object
- */
- def buildIvySettings(remoteRepos: Option[String], ivyPath: Option[String]): IvySettings = {
- val ivySettings: IvySettings = new IvySettings
- processIvyPathArg(ivySettings, ivyPath)
-
- // create a pattern matcher
- ivySettings.addMatcher(new GlobPatternMatcher)
- // create the dependency resolvers
- val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir)
- ivySettings.addResolver(repoResolver)
- ivySettings.setDefaultResolver(repoResolver.getName)
- processRemoteRepoArg(ivySettings, remoteRepos)
- ivySettings
- }
-
- /**
- * Load Ivy settings from a given filename, using supplied resolvers
- *
- * @param settingsFile Path to Ivy settings file
- * @param remoteRepos Comma-delimited string of remote repositories other than maven central
- * @param ivyPath The path to the local ivy repository
- * @return An IvySettings object
- */
- def loadIvySettings(
- settingsFile: String,
- remoteRepos: Option[String],
- ivyPath: Option[String]): IvySettings = {
- val file = new File(settingsFile)
- require(file.exists(), s"Ivy settings file $file does not exist")
- require(file.isFile(), s"Ivy settings file $file is not a normal file")
- val ivySettings: IvySettings = new IvySettings
- try {
- ivySettings.load(file)
- } catch {
- case e@(_: IOException | _: ParseException) =>
- throw new RuntimeException(s"Failed when loading Ivy settings from $settingsFile", e)
- }
- processIvyPathArg(ivySettings, ivyPath)
- processRemoteRepoArg(ivySettings, remoteRepos)
- ivySettings
- }
-
- /* Set ivy settings for location of cache, if option is supplied */
- private def processIvyPathArg(ivySettings: IvySettings, ivyPath: Option[String]): Unit = {
- ivyPath.filterNot(_.trim.isEmpty).foreach { alternateIvyDir =>
- ivySettings.setDefaultIvyUserDir(new File(alternateIvyDir))
- ivySettings.setDefaultCache(new File(alternateIvyDir, "cache"))
- }
- }
-
- /* Add any optional additional remote repositories */
- private def processRemoteRepoArg(ivySettings: IvySettings, remoteRepos: Option[String]): Unit = {
- remoteRepos.filterNot(_.trim.isEmpty).map(_.split(",")).foreach { repositoryList =>
- val cr = new ChainResolver
- cr.setName("user-list")
-
- // add current default resolver, if any
- Option(ivySettings.getDefaultResolver).foreach(cr.add)
-
- // add additional repositories, last resolution in chain takes precedence
- repositoryList.zipWithIndex.foreach { case (repo, i) =>
- val brr: IBiblioResolver = new IBiblioResolver
- brr.setM2compatible(true)
- brr.setUsepoms(true)
- brr.setRoot(repo)
- brr.setName(s"repo-${i + 1}")
- cr.add(brr)
- // scalastyle:off println
- printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}")
- // scalastyle:on println
- }
-
- ivySettings.addResolver(cr)
- ivySettings.setDefaultResolver(cr.getName)
- }
- }
-
- /** A nice function to use in tests as well. Values are dummy strings. */
- def getModuleDescriptor: DefaultModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance(
- // Include UUID in module name, so multiple clients resolving maven coordinate at the same time
- // do not modify the same resolution file concurrently.
- ModuleRevisionId.newInstance("org.apache.flink",
- s"flink-parent-${UUID.randomUUID.toString}",
- "1.0"))
-
- private def clearIvyResolutionFiles(
- mdId: ModuleRevisionId,
- ivySettings: IvySettings,
- ivyConfName: String): Unit = {
- val currentResolutionFiles = Seq(
- s"${mdId.getOrganisation}-${mdId.getName}-$ivyConfName.xml",
- s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.xml",
- s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.properties"
- )
- currentResolutionFiles.foreach { filename =>
- new File(ivySettings.getDefaultCache, filename).delete()
- }
- }
-
- /**
- * Resolves any dependencies that were supplied through maven coordinates
- *
- * @param coordinates Comma-delimited string of maven coordinates
- * @param ivySettings An IvySettings containing resolvers to use
- * @param exclusions Exclusions to apply when resolving transitive dependencies
- * @return The comma-delimited path to the jars of the given maven artifacts including their
- * transitive dependencies
- */
- def resolveMavenCoordinates(
- coordinates: String,
- ivySettings: IvySettings,
- exclusions: Seq[String] = Nil,
- isTest: Boolean = false): String = {
- if (coordinates == null || coordinates.trim.isEmpty) {
- ""
- } else {
- val sysOut = System.out
- try {
- // To prevent ivy from logging to system out
- System.setOut(printStream)
- val artifacts = extractMavenCoordinates(coordinates)
- val packagesDirectory: File = new File(ivySettings.getDefaultIvyUserDir, "jars")
- // scalastyle:off println
- printStream.println(
- s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}")
- printStream.println(s"The jars for the packages stored in: $packagesDirectory")
- // scalastyle:on println
-
- val ivy = Ivy.newInstance(ivySettings)
- // Set resolve options to download transitive dependencies as well
- val resolveOptions = new ResolveOptions
- resolveOptions.setTransitive(true)
- val retrieveOptions = new RetrieveOptions
- // Turn downloading and logging off for testing
- if (isTest) {
- resolveOptions.setDownload(false)
- resolveOptions.setLog(LogOptions.LOG_QUIET)
- retrieveOptions.setLog(LogOptions.LOG_QUIET)
- } else {
- resolveOptions.setDownload(true)
- }
-
- // Default configuration name for ivy
- val ivyConfName = "default"
-
- // A Module descriptor must be specified. Entries are dummy strings
- val md = getModuleDescriptor
-
- md.setDefaultConf(ivyConfName)
-
- // Add exclusion rules for Flink and Scala Library
- addExclusionRules(ivySettings, ivyConfName, md)
- // add all supplied maven artifacts as dependencies
- addDependenciesToIvy(md, artifacts, ivyConfName)
- exclusions.foreach { e =>
- md.addExcludeRule(createExclusion(e + ":*", ivySettings, ivyConfName))
- }
- // resolve dependencies
- val rr: ResolveReport = ivy.resolve(md, resolveOptions)
- if (rr.hasError) {
- throw new RuntimeException(rr.getAllProblemMessages.toString)
- }
- // retrieve all resolved dependencies
- ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
- packagesDirectory.getAbsolutePath + File.separator +
- "[organization]_[artifact]-[revision](-[classifier]).[ext]",
- retrieveOptions.setConfs(Array(ivyConfName)))
- val paths = resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
- val mdId = md.getModuleRevisionId
- clearIvyResolutionFiles(mdId, ivySettings, ivyConfName)
- paths
- } finally {
- System.setOut(sysOut)
- }
- }
- }
-
- private def createExclusion(
- coords: String,
- ivySettings: IvySettings,
- ivyConfName: String): ExcludeRule = {
- val c = extractMavenCoordinates(coords)(0)
- val id = new ArtifactId(new ModuleId(c.groupId, c.artifactId), "*", "*", "*")
- val rule = new DefaultExcludeRule(id, ivySettings.getMatcher("glob"), null)
- rule.addConfiguration(ivyConfName)
- rule
- }
-
-}