You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/10/11 15:47:42 UTC
[incubator-streampark] branch dev updated: [Bug] after flink 1.15 error when verify SQL script bug fix (#1738)
This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 45e03a07d [Bug] after flink 1.15 error when verify SQL script bug fix (#1738)
45e03a07d is described below
commit 45e03a07ddaaea335ce753733ebd44bb4c69bc33
Author: monrg <45...@users.noreply.github.com>
AuthorDate: Tue Oct 11 23:47:35 2022 +0800
[Bug] after flink 1.15 error when verify SQL script bug fix (#1738)
* [Bug] after flink 1.15 error when verify SQL script bug fix
Co-authored-by: wangqingrong <wa...@gitv.cn>
---
.../core/service/impl/FlinkSqlServiceImpl.java | 9 +-
.../streampark/flink/proxy/FlinkShimsProxy.scala | 121 ++++++++++++++-------
2 files changed, 86 insertions(+), 44 deletions(-)
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
index 5526cf54d..e00738f8e 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
@@ -45,7 +45,6 @@ import org.springframework.transaction.annotation.Transactional;
import java.lang.reflect.Method;
import java.util.List;
-import java.util.function.Function;
@Slf4j
@Service
@@ -117,7 +116,7 @@ public class FlinkSqlServiceImpl extends ServiceImpl<FlinkSqlMapper, FlinkSql> i
@Override
public List<FlinkSql> history(Application application) {
- LambdaQueryWrapper<FlinkSql> wrapper = new LambdaQueryWrapper();
+ LambdaQueryWrapper<FlinkSql> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(FlinkSql::getAppId, application.getId())
.orderByDesc(FlinkSql::getVersion);
@@ -185,7 +184,7 @@ public class FlinkSqlServiceImpl extends ServiceImpl<FlinkSqlMapper, FlinkSql> i
@Override
public FlinkSqlValidationResult verifySql(String sql, Long versionId) {
FlinkEnv flinkEnv = flinkEnvService.getById(versionId);
- return FlinkShimsProxy.proxy(flinkEnv.getFlinkVersion(), (Function<ClassLoader, FlinkSqlValidationResult>) classLoader -> {
+ return FlinkShimsProxy.proxyVerifySql(flinkEnv.getFlinkVersion(), classLoader -> {
try {
Class<?> clazz = classLoader.loadClass("org.apache.streampark.flink.core.FlinkSqlValidator");
Method method = clazz.getDeclaredMethod("verifySql", String.class);
@@ -201,8 +200,4 @@ public class FlinkSqlServiceImpl extends ServiceImpl<FlinkSqlMapper, FlinkSql> i
return null;
});
}
-
- private boolean isFlinkSqlBacked(FlinkSql sql) {
- return backUpService.isFlinkSqlBacked(sql.getAppId(), sql.getId());
- }
}
diff --git a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
index 46b8ce8e8..72dcd10f2 100644
--- a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
+++ b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
@@ -42,6 +42,8 @@ object FlinkShimsProxy extends Logger {
private[this] val SHIMS_CLASS_LOADER_CACHE = MutableMap[String, ClassLoader]()
+ private[this] val VERIFY_SQL_CLASS_LOADER_CACHE = MutableMap[String, ClassLoader]()
+
private[this] def getFlinkShimsResourcePattern(flinkLargeVersion: String) =
Pattern.compile(
s"flink-(.*)-$flinkLargeVersion(.*).jar",
@@ -76,64 +78,109 @@ object FlinkShimsProxy extends Logger {
*/
def proxy[T](flinkVersion: FlinkVersion, func: JavaFunc[ClassLoader, T]): T = {
val shimsClassLoader = getFlinkShimsClassLoader(flinkVersion)
- ClassLoaderUtils.runAsClassLoader[T](shimsClassLoader, new Supplier[T]() {
- override def get(): T = func.apply(shimsClassLoader)
+ ClassLoaderUtils.runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader))
+ }
+
+ // flink 1.12 1.13~1.14 1.15 parseSql class exist in different dependencies,
+ //need to load all flink-table dependencies compatible with different versions
+ def getVerifySqlLibClassLoader(flinkVersion: FlinkVersion): ClassLoader = {
+ logInfo(s"add verify sql lib,flink version: $flinkVersion")
+ VERIFY_SQL_CLASS_LOADER_CACHE.getOrElseUpdate(s"${flinkVersion.fullVersion}", {
+ val getFlinkTable: File => Boolean = _.getName.startsWith("flink-table")
+ // 1) flink/lib/flink-table*
+ val libTableURL = getFlinkHomeLib(flinkVersion.flinkHome, "lib", getFlinkTable)
+
+ // 2) After version 1.15 need add flink/opt/flink-table*
+ val optTableURL = getFlinkHomeLib(flinkVersion.flinkHome, "opt", getFlinkTable)
+ val shimsUrls = ListBuffer[URL](libTableURL ++ optTableURL: _*)
+
+ // 3) add only streampark shims jar
+ addShimsUrls(flinkVersion, file => {
+ if (file != null && file.getName.startsWith("streampark-flink-shims")) {
+ shimsUrls += file.toURI.toURL
+ }
+ })
+ new ChildFirstClassLoader(
+ shimsUrls.toArray,
+ Thread.currentThread().getContextClassLoader,
+ getFlinkShimsResourcePattern(flinkVersion.majorVersion)
+ )
})
}
- private[this] def getFlinkShimsClassLoader(flinkVersion: FlinkVersion): ClassLoader = {
+ def addShimsUrls(flinkVersion: FlinkVersion, addShimUrl: File => Unit): Unit = {
+ val appHome = System.getProperty(ConfigConst.KEY_APP_HOME)
+ require(appHome != null, String.format("%s is not found on System env.", ConfigConst.KEY_APP_HOME))
+
+ val libPath = new File(s"$appHome/lib")
+ require(libPath.exists())
val majorVersion = flinkVersion.majorVersion
val scalaVersion = flinkVersion.scalaVersion
- logInfo(flinkVersion.toString)
+ val streamParkMatcher = getStreamParkLibPattern(scalaVersion)
+
+ libPath.listFiles().foreach((jar: File) => {
+ try {
+ val shimsMatcher = SHIMS_PATTERN.matcher(jar.getName)
+ if (shimsMatcher.matches()) {
+ if (majorVersion == shimsMatcher.group(1) && scalaVersion == shimsMatcher.group(2)) {
+ addShimUrl(jar)
+ }
+ } else {
+ if (INCLUDE_PATTERN.matcher(jar.getName).matches()) {
+ addShimUrl(jar)
+ logInfo(s"include jar lib: ${jar.getName}")
+ }
+ if (streamParkMatcher.matcher(jar.getName).matches()) {
+ addShimUrl(jar)
+ logInfo(s"include streampark lib: ${jar.getName}")
+ }
+ }
+ } catch {
+ case e: Exception => e.printStackTrace()
+ }
+ })
+ }
+
+ /**
+ * Get ClassLoader to verify sql
+ *
+ * @param flinkVersion flinkVersion
+ * @param func execute function
+ * @tparam T
+ * @return
+ */
+ def proxyVerifySql[T](flinkVersion: FlinkVersion, func: JavaFunc[ClassLoader, T]): T = {
+ val shimsClassLoader = getVerifySqlLibClassLoader(flinkVersion)
+ ClassLoaderUtils.runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader))
+ }
+
+ private[this] def getFlinkShimsClassLoader(flinkVersion: FlinkVersion): ClassLoader = {
+ logInfo(s"add flink shims urls classloader,flink version: $flinkVersion")
SHIMS_CLASS_LOADER_CACHE.getOrElseUpdate(s"${flinkVersion.fullVersion}", {
// 1) flink/lib
- val libURL = getFlinkHomeLib(flinkVersion.flinkHome)
+ val libURL = getFlinkHomeLib(flinkVersion.flinkHome, "lib", !_.getName.startsWith("log4j"))
val shimsUrls = ListBuffer[URL](libURL: _*)
- // 2) shims jar
- val appHome = System.getProperty(ConfigConst.KEY_APP_HOME)
- require(appHome != null, String.format("%s is not found on System env.", ConfigConst.KEY_APP_HOME))
-
- val libPath = new File(s"$appHome/lib")
- require(libPath.exists())
-
- val streamParkMatcher = getStreamParkLibPattern(scalaVersion)
-
- libPath.listFiles().foreach(jar => {
- try {
- val shimsMatcher = SHIMS_PATTERN.matcher(jar.getName)
- if (shimsMatcher.matches()) {
- if (majorVersion == shimsMatcher.group(1) && scalaVersion == shimsMatcher.group(2)) {
- shimsUrls += jar.toURI.toURL
- }
- } else {
- if (INCLUDE_PATTERN.matcher(jar.getName).matches()) {
- shimsUrls += jar.toURI.toURL
- logInfo(s"include jar lib: ${jar.getName}")
- }
- if (streamParkMatcher.matcher(jar.getName).matches()) {
- shimsUrls += jar.toURI.toURL
- logInfo(s"include streampark lib: ${jar.getName}")
- }
- }
- } catch {
- case e: Exception => e.printStackTrace()
+ // 2) add all shims jar
+ addShimsUrls(flinkVersion, file => {
+ if (file != null) {
+ shimsUrls += file.toURI.toURL
}
})
new ChildFirstClassLoader(
shimsUrls.toArray,
Thread.currentThread().getContextClassLoader,
- getFlinkShimsResourcePattern(majorVersion)
+ getFlinkShimsResourcePattern(flinkVersion.majorVersion)
)
})
}
- private[this] def getFlinkHomeLib(flinkHome: String): List[URL] = {
- val file = new File(flinkHome, "lib")
+ private[this] def getFlinkHomeLib(flinkHome: String, childDir: String, filterFun: File => Boolean): List[URL] = {
+ val file = new File(flinkHome, childDir)
require(file.isDirectory, s"FLINK_HOME $file does not exist")
- file.listFiles.filter(!_.getName.startsWith("log4j")).map(_.toURI.toURL).toList
+ file.listFiles.filter(filterFun).map(_.toURI.toURL).toList
}
@throws[Exception]