You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@kyuubi.apache.org by "pan3793 (via GitHub)" <gi...@apache.org> on 2023/04/12 03:32:04 UTC

[GitHub] [kyuubi] pan3793 commented on a diff in pull request #4368: [WIP][KYUUBI #4367] Support Flink 1.17

pan3793 commented on code in PR #4368:
URL: https://github.com/apache/kyuubi/pull/4368#discussion_r1163547576


##########
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala:
##########
@@ -62,47 +70,100 @@ object FlinkEngineUtils extends Logging {
   def isFlinkVersionEqualTo(targetVersionString: String): Boolean =
     SemanticVersion(EnvironmentInformation.getVersion).isVersionEqualTo(targetVersionString)
 
-  def parseCliOptions(args: Array[String]): CliOptions = {
-    val (mode, modeArgs) =
-      if (args.isEmpty || args(0).startsWith("-")) (MODE_EMBEDDED, args)
-      else (args(0), args.drop(1))
-    val options = parseEmbeddedModeClient(modeArgs)
-    if (mode == MODE_EMBEDDED) {
-      if (options.isPrintHelp) {
-        printHelpEmbeddedModeClient()
+  /**
+   * Copied and modified from [[org.apache.flink.table.client.cli.CliOptionsParser]]
+   * to avoid loading flink-python classes which we doesn't support yet.
+   */
+  private def discoverDependencies(
+      jars: java.util.List[URL],
+      libraries: java.util.List[URL]): java.util.List[URL] = {
+    val dependencies: java.util.List[URL] = new java.util.ArrayList[URL]
+    try { // find jar files
+      for (url <- jars) {
+        JarUtils.checkJarFile(url)
+        dependencies.add(url)
       }
-      options
+      // find jar files in library directories
+      for (libUrl <- libraries) {
+        val dir: File = new File(libUrl.toURI)
+        if (!dir.isDirectory) throw new SqlClientException("Directory expected: " + dir)
+        else if (!dir.canRead) throw new SqlClientException("Directory cannot be read: " + dir)
+        val files: Array[File] = dir.listFiles
+        if (files == null) throw new SqlClientException("Directory cannot be read: " + dir)
+        for (f <- files) { // only consider jars
+          if (f.isFile && f.getAbsolutePath.toLowerCase.endsWith(".jar")) {
+            val url: URL = f.toURI.toURL
+            JarUtils.checkJarFile(url)
+            dependencies.add(url)
+          }
+        }
+      }
+    } catch {
+      case e: Exception =>
+        throw new SqlClientException("Could not load all required JAR files.", e)
+    }
+    dependencies
+  }
+
+  def getDefaultContext(
+      args: Array[String],
+      flinkConf: Configuration,
+      flinkConfDir: String): DefaultContext = {
+    val parser = new DefaultParser
+    val line = parser.parse(EMBEDDED_MODE_CLIENT_OPTIONS, args, true)
+    val jars: java.util.List[URL] = Option(checkUrls(line, CliOptionsParser.OPTION_JAR))
+      .getOrElse(Collections.emptyList())
+    val libDirs: java.util.List[URL] = Option(checkUrls(line, CliOptionsParser.OPTION_LIBRARY))
+      .getOrElse(Collections.emptyList())
+    val dependencies: java.util.List[URL] = discoverDependencies(jars, libDirs)
+    if (FlinkEngineUtils.isFlinkVersionAtMost("1.16")) {

Review Comment:
   use exact match



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org