You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/10/11 15:47:42 UTC

[incubator-streampark] branch dev updated: [Bug] after flink 1.15 error when verify SQL script bug fix (#1738)

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 45e03a07d [Bug] after flink 1.15 error when verify SQL script bug fix (#1738)
45e03a07d is described below

commit 45e03a07ddaaea335ce753733ebd44bb4c69bc33
Author: monrg <45...@users.noreply.github.com>
AuthorDate: Tue Oct 11 23:47:35 2022 +0800

    [Bug] after flink 1.15 error when verify SQL script bug fix (#1738)
    
    * [Bug] after flink 1.15 error when verify SQL script bug fix
    
    Co-authored-by: wangqingrong <wa...@gitv.cn>
---
 .../core/service/impl/FlinkSqlServiceImpl.java     |   9 +-
 .../streampark/flink/proxy/FlinkShimsProxy.scala   | 121 ++++++++++++++-------
 2 files changed, 86 insertions(+), 44 deletions(-)

diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
index 5526cf54d..e00738f8e 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
@@ -45,7 +45,6 @@ import org.springframework.transaction.annotation.Transactional;
 
 import java.lang.reflect.Method;
 import java.util.List;
-import java.util.function.Function;
 
 @Slf4j
 @Service
@@ -117,7 +116,7 @@ public class FlinkSqlServiceImpl extends ServiceImpl<FlinkSqlMapper, FlinkSql> i
 
     @Override
     public List<FlinkSql> history(Application application) {
-        LambdaQueryWrapper<FlinkSql> wrapper = new LambdaQueryWrapper();
+        LambdaQueryWrapper<FlinkSql> wrapper = new LambdaQueryWrapper<>();
         wrapper.eq(FlinkSql::getAppId, application.getId())
             .orderByDesc(FlinkSql::getVersion);
 
@@ -185,7 +184,7 @@ public class FlinkSqlServiceImpl extends ServiceImpl<FlinkSqlMapper, FlinkSql> i
     @Override
     public FlinkSqlValidationResult verifySql(String sql, Long versionId) {
         FlinkEnv flinkEnv = flinkEnvService.getById(versionId);
-        return FlinkShimsProxy.proxy(flinkEnv.getFlinkVersion(), (Function<ClassLoader, FlinkSqlValidationResult>) classLoader -> {
+        return FlinkShimsProxy.proxyVerifySql(flinkEnv.getFlinkVersion(), classLoader -> {
             try {
                 Class<?> clazz = classLoader.loadClass("org.apache.streampark.flink.core.FlinkSqlValidator");
                 Method method = clazz.getDeclaredMethod("verifySql", String.class);
@@ -201,8 +200,4 @@ public class FlinkSqlServiceImpl extends ServiceImpl<FlinkSqlMapper, FlinkSql> i
             return null;
         });
     }
-
-    private boolean isFlinkSqlBacked(FlinkSql sql) {
-        return backUpService.isFlinkSqlBacked(sql.getAppId(), sql.getId());
-    }
 }
diff --git a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
index 46b8ce8e8..72dcd10f2 100644
--- a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
+++ b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
@@ -42,6 +42,8 @@ object FlinkShimsProxy extends Logger {
 
   private[this] val SHIMS_CLASS_LOADER_CACHE = MutableMap[String, ClassLoader]()
 
+  private[this] val VERIFY_SQL_CLASS_LOADER_CACHE = MutableMap[String, ClassLoader]()
+
   private[this] def getFlinkShimsResourcePattern(flinkLargeVersion: String) =
     Pattern.compile(
       s"flink-(.*)-$flinkLargeVersion(.*).jar",
@@ -76,64 +78,109 @@ object FlinkShimsProxy extends Logger {
    */
   def proxy[T](flinkVersion: FlinkVersion, func: JavaFunc[ClassLoader, T]): T = {
     val shimsClassLoader = getFlinkShimsClassLoader(flinkVersion)
-    ClassLoaderUtils.runAsClassLoader[T](shimsClassLoader, new Supplier[T]() {
-      override def get(): T = func.apply(shimsClassLoader)
+    ClassLoaderUtils.runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader))
+  }
+
+  // flink 1.12 1.13~1.14 1.15 parseSql class exist in different dependencies,
+  //need to load all flink-table dependencies compatible with different versions
+  def getVerifySqlLibClassLoader(flinkVersion: FlinkVersion): ClassLoader = {
+    logInfo(s"add  verify sql lib,flink version:  $flinkVersion")
+    VERIFY_SQL_CLASS_LOADER_CACHE.getOrElseUpdate(s"${flinkVersion.fullVersion}", {
+      val getFlinkTable: File => Boolean = _.getName.startsWith("flink-table")
+      // 1) flink/lib/flink-table*
+      val libTableURL = getFlinkHomeLib(flinkVersion.flinkHome, "lib", getFlinkTable)
+
+      // 2) After version 1.15 need add flink/opt/flink-table*
+      val optTableURL = getFlinkHomeLib(flinkVersion.flinkHome, "opt", getFlinkTable)
+      val shimsUrls = ListBuffer[URL](libTableURL ++ optTableURL: _*)
+
+      // 3) add only streampark shims jar
+      addShimsUrls(flinkVersion, file => {
+        if (file != null && file.getName.startsWith("streampark-flink-shims")) {
+          shimsUrls += file.toURI.toURL
+        }
+      })
+      new ChildFirstClassLoader(
+        shimsUrls.toArray,
+        Thread.currentThread().getContextClassLoader,
+        getFlinkShimsResourcePattern(flinkVersion.majorVersion)
+      )
     })
   }
 
-  private[this] def getFlinkShimsClassLoader(flinkVersion: FlinkVersion): ClassLoader = {
+  def addShimsUrls(flinkVersion: FlinkVersion, addShimUrl: File => Unit): Unit = {
+    val appHome = System.getProperty(ConfigConst.KEY_APP_HOME)
+    require(appHome != null, String.format("%s is not found on System env.", ConfigConst.KEY_APP_HOME))
+
+    val libPath = new File(s"$appHome/lib")
+    require(libPath.exists())
     val majorVersion = flinkVersion.majorVersion
     val scalaVersion = flinkVersion.scalaVersion
-    logInfo(flinkVersion.toString)
+    val streamParkMatcher = getStreamParkLibPattern(scalaVersion)
+
+    libPath.listFiles().foreach((jar: File) => {
+      try {
+        val shimsMatcher = SHIMS_PATTERN.matcher(jar.getName)
+        if (shimsMatcher.matches()) {
+          if (majorVersion == shimsMatcher.group(1) && scalaVersion == shimsMatcher.group(2)) {
+            addShimUrl(jar)
+          }
+        } else {
+          if (INCLUDE_PATTERN.matcher(jar.getName).matches()) {
+            addShimUrl(jar)
+            logInfo(s"include jar lib: ${jar.getName}")
+          }
+          if (streamParkMatcher.matcher(jar.getName).matches()) {
+            addShimUrl(jar)
+            logInfo(s"include streampark lib: ${jar.getName}")
+          }
+        }
+      } catch {
+        case e: Exception => e.printStackTrace()
+      }
+    })
+  }
+
+  /**
+   * Get ClassLoader to verify sql
+   *
+   * @param flinkVersion flinkVersion
+   * @param func         execute function
+   * @tparam T
+   * @return
+   */
+  def proxyVerifySql[T](flinkVersion: FlinkVersion, func: JavaFunc[ClassLoader, T]): T = {
+    val shimsClassLoader = getVerifySqlLibClassLoader(flinkVersion)
+    ClassLoaderUtils.runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader))
+  }
+
+  private[this] def getFlinkShimsClassLoader(flinkVersion: FlinkVersion): ClassLoader = {
+    logInfo(s"add flink shims urls classloader,flink version: $flinkVersion")
 
     SHIMS_CLASS_LOADER_CACHE.getOrElseUpdate(s"${flinkVersion.fullVersion}", {
       // 1) flink/lib
-      val libURL = getFlinkHomeLib(flinkVersion.flinkHome)
+      val libURL = getFlinkHomeLib(flinkVersion.flinkHome, "lib", !_.getName.startsWith("log4j"))
       val shimsUrls = ListBuffer[URL](libURL: _*)
 
-      // 2) shims jar
-      val appHome = System.getProperty(ConfigConst.KEY_APP_HOME)
-      require(appHome != null, String.format("%s is not found on System env.", ConfigConst.KEY_APP_HOME))
-
-      val libPath = new File(s"$appHome/lib")
-      require(libPath.exists())
-
-      val streamParkMatcher = getStreamParkLibPattern(scalaVersion)
-
-      libPath.listFiles().foreach(jar => {
-        try {
-          val shimsMatcher = SHIMS_PATTERN.matcher(jar.getName)
-          if (shimsMatcher.matches()) {
-            if (majorVersion == shimsMatcher.group(1) && scalaVersion == shimsMatcher.group(2)) {
-              shimsUrls += jar.toURI.toURL
-            }
-          } else {
-            if (INCLUDE_PATTERN.matcher(jar.getName).matches()) {
-              shimsUrls += jar.toURI.toURL
-              logInfo(s"include jar lib: ${jar.getName}")
-            }
-            if (streamParkMatcher.matcher(jar.getName).matches()) {
-              shimsUrls += jar.toURI.toURL
-              logInfo(s"include streampark lib: ${jar.getName}")
-            }
-          }
-        } catch {
-          case e: Exception => e.printStackTrace()
+      // 2) add all shims jar
+      addShimsUrls(flinkVersion, file => {
+        if (file != null) {
+          shimsUrls += file.toURI.toURL
         }
       })
 
       new ChildFirstClassLoader(
         shimsUrls.toArray,
         Thread.currentThread().getContextClassLoader,
-        getFlinkShimsResourcePattern(majorVersion)
+        getFlinkShimsResourcePattern(flinkVersion.majorVersion)
       )
     })
   }
 
-  private[this] def getFlinkHomeLib(flinkHome: String): List[URL] = {
-    val file = new File(flinkHome, "lib")
+  private[this] def getFlinkHomeLib(flinkHome: String, childDir: String, filterFun: File => Boolean): List[URL] = {
+    val file = new File(flinkHome, childDir)
     require(file.isDirectory, s"FLINK_HOME $file does not exist")
-    file.listFiles.filter(!_.getName.startsWith("log4j")).map(_.toURI.toURL).toList
+    file.listFiles.filter(filterFun).map(_.toURI.toURL).toList
   }
 
   @throws[Exception]