You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2021/11/09 03:44:47 UTC
[flink] branch release-1.14 updated: [FLINK-24761][table] Fix
PartitionPruner code gen compile fail
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new aa6ef87 [FLINK-24761][table] Fix PartitionPruner code gen compile fail
aa6ef87 is described below
commit aa6ef87029ea5f24d831dea0b89c0fa47d9e13ba
Author: zhangmang <zh...@163.com>
AuthorDate: Tue Nov 9 11:44:24 2021 +0800
[FLINK-24761][table] Fix PartitionPruner code gen compile fail
This closes #17724
---
.../flink/table/client/cli/CliClientITCase.java | 8 +++--
.../client/gateway/context/SessionContextTest.java | 8 +++--
.../client/gateway/local/LocalExecutorITCase.java | 8 +++--
.../client/gateway/utils/UserDefinedFunctions.java | 11 ++++++
.../table/planner/plan/utils/PartitionPruner.scala | 2 +-
.../flink/table}/utils/TestUserClassLoaderJar.java | 26 +++++---------
.../batch/sql/PartitionableSourceITCase.scala | 41 ++++++++++++++++++++--
7 files changed, 77 insertions(+), 27 deletions(-)
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
index 40bb3ac..4292bc9 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
@@ -24,7 +24,8 @@ import org.apache.flink.table.client.cli.utils.TestSqlStatement;
import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.context.DefaultContext;
import org.apache.flink.table.client.gateway.local.LocalExecutor;
-import org.apache.flink.table.client.gateway.utils.TestUserClassLoaderJar;
+import org.apache.flink.table.client.gateway.utils.UserDefinedFunctions;
+import org.apache.flink.table.utils.TestUserClassLoaderJar;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.shaded.guava30.com.google.common.io.PatternFilenameFilter;
@@ -98,7 +99,10 @@ public class CliClientITCase extends AbstractTestBase {
public static void setup() throws IOException {
File udfJar =
TestUserClassLoaderJar.createJarFile(
- tempFolder.newFolder("test-jar"), "test-classloader-udf.jar");
+ tempFolder.newFolder("test-jar"),
+ "test-classloader-udf.jar",
+ UserDefinedFunctions.GENERATED_UDF_CLASS,
+ UserDefinedFunctions.GENERATED_UDF_CODE);
URL udfDependency = udfJar.toURI().toURL();
historyPath = tempFolder.newFile("history").toPath();
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
index 3fd7074..36dadee 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
@@ -20,7 +20,8 @@ package org.apache.flink.table.client.gateway.context;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.client.gateway.utils.TestUserClassLoaderJar;
+import org.apache.flink.table.client.gateway.utils.UserDefinedFunctions;
+import org.apache.flink.table.utils.TestUserClassLoaderJar;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -61,7 +62,10 @@ public class SessionContextTest {
public static void prepare() throws Exception {
udfJar =
TestUserClassLoaderJar.createJarFile(
- tempFolder.newFolder("test-jar"), "test-classloader-udf.jar");
+ tempFolder.newFolder("test-jar"),
+ "test-classloader-udf.jar",
+ UserDefinedFunctions.GENERATED_UDF_CLASS,
+ UserDefinedFunctions.GENERATED_UDF_CODE);
}
@Before
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 8e3cc18..d2093a5 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -34,12 +34,13 @@ import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.ResultDescriptor;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.table.client.gateway.context.DefaultContext;
-import org.apache.flink.table.client.gateway.utils.TestUserClassLoaderJar;
+import org.apache.flink.table.client.gateway.utils.UserDefinedFunctions;
import org.apache.flink.table.client.gateway.utils.UserDefinedFunctions.TableUDF;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.utils.TestUserClassLoaderJar;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
@@ -103,7 +104,10 @@ public class LocalExecutorITCase extends TestLogger {
clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient();
File udfJar =
TestUserClassLoaderJar.createJarFile(
- tempFolder.newFolder("test-jar"), "test-classloader-udf.jar");
+ tempFolder.newFolder("test-jar"),
+ "test-classloader-udf.jar",
+ UserDefinedFunctions.GENERATED_UDF_CLASS,
+ UserDefinedFunctions.GENERATED_UDF_CODE);
udfDependency = udfJar.toURI().toURL();
}
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
index 94130fc..d0cd826 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
@@ -29,6 +29,17 @@ import org.apache.flink.types.Row;
/** A bunch of UDFs for testing the SQL Client. */
public class UserDefinedFunctions {
+ public static final String GENERATED_UDF_CLASS = "LowerUDF";
+
+ public static final String GENERATED_UDF_CODE =
+ "public class "
+ + GENERATED_UDF_CLASS
+ + " extends org.apache.flink.table.functions.ScalarFunction {\n"
+ + " public String eval(String str) {\n"
+ + " return str.toLowerCase();\n"
+ + " }\n"
+ + "}\n";
+
/** The scalar function for SQL Client test. */
public static class ScalarUDF extends ScalarFunction {
public String eval(Integer i, Integer offset) {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/PartitionPruner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/PartitionPruner.scala
index 9c397a8..1ed5ac3 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/PartitionPruner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/PartitionPruner.scala
@@ -109,7 +109,7 @@ object PartitionPruner {
inputType,
collectorTerm = collectorTerm)
- val function = genFunction.newInstance(getClass.getClassLoader)
+ val function = genFunction.newInstance(Thread.currentThread().getContextClassLoader)
val richMapFunction = function match {
case r: RichMapFunction[GenericRowData, Boolean] => r
case _ => throw new TableException("RichMapFunction[GenericRowData, Boolean] required here")
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestUserClassLoaderJar.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/utils/TestUserClassLoaderJar.java
similarity index 74%
rename from flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestUserClassLoaderJar.java
rename to flink-table/flink-table-planner/src/test/java/org/apache/flink/table/utils/TestUserClassLoaderJar.java
index 921e513..4765f0d 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestUserClassLoaderJar.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/utils/TestUserClassLoaderJar.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.client.gateway.utils;
+package org.apache.flink.table.utils;
import org.apache.flink.util.FileUtils;
@@ -34,27 +34,17 @@ import java.util.Collections;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
-/** Mainly used for testing classloading of UDF dependencies. */
+/** Mainly used for testing classloading. */
public class TestUserClassLoaderJar {
- private static final String GENERATED_UDF_CLASS = "LowerUDF";
-
- private static final String GENERATED_UDF_CODE =
- "public class "
- + GENERATED_UDF_CLASS
- + " extends org.apache.flink.table.functions.ScalarFunction {\n"
- + " public String eval(String str) {\n"
- + " return str.toLowerCase();\n"
- + " }\n"
- + "}\n";
-
/** Pack the generated UDF class into a JAR and return the path of the JAR. */
- public static File createJarFile(File tmpDir, String jarName) throws IOException {
+ public static File createJarFile(File tmpDir, String jarName, String className, String javaCode)
+ throws IOException {
// write class source code to file
- File javaFile = Paths.get(tmpDir.toString(), GENERATED_UDF_CLASS + ".java").toFile();
+ File javaFile = Paths.get(tmpDir.toString(), className + ".java").toFile();
//noinspection ResultOfMethodCallIgnored
javaFile.createNewFile();
- FileUtils.writeFileUtf8(javaFile, GENERATED_UDF_CODE);
+ FileUtils.writeFileUtf8(javaFile, javaCode);
// compile class source code
DiagnosticCollector<JavaFileObject> diagnostics = new DiagnosticCollector<>();
@@ -74,10 +64,10 @@ public class TestUserClassLoaderJar {
task.call();
// pack class file to jar
- File classFile = Paths.get(tmpDir.toString(), GENERATED_UDF_CLASS + ".class").toFile();
+ File classFile = Paths.get(tmpDir.toString(), className + ".class").toFile();
File jarFile = Paths.get(tmpDir.toString(), jarName).toFile();
JarOutputStream jos = new JarOutputStream(new FileOutputStream(jarFile));
- JarEntry jarEntry = new JarEntry(GENERATED_UDF_CLASS + ".class");
+ JarEntry jarEntry = new JarEntry(className + ".class");
jos.putNextEntry(jarEntry);
byte[] classBytes = FileUtils.readAllBytes(classFile.toPath());
jos.write(classBytes);
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
index 39653c5..0b71233 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
@@ -18,16 +18,24 @@
package org.apache.flink.table.planner.runtime.batch.sql
-import java.util
-
+import org.apache.flink.client.ClientUtils
+import org.apache.flink.configuration.Configuration
import org.apache.flink.table.catalog.{CatalogPartitionImpl, CatalogPartitionSpec, ObjectPath}
import org.apache.flink.table.planner.factories.{TestValuesCatalog, TestValuesTableFactory}
+import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.TEMPORARY_FOLDER
import org.apache.flink.table.planner.runtime.utils.BatchTestBase
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.utils.TestUserClassLoaderJar
+import org.apache.flink.util.TemporaryClassLoaderContext
+
import org.junit.{Before, Test}
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
+import java.io.File
+import java.net.URL
+import java.util
+
import scala.collection.JavaConversions._
@RunWith(classOf[Parameterized])
@@ -120,6 +128,35 @@ class PartitionableSourceITCase(
)
)
}
+
+ @Test
+ def testPartitionPrunerCompileClassLoader(): Unit = {
+ val udfJavaCode =
+ s"""
+ |public class TrimUDF extends org.apache.flink.table.functions.ScalarFunction {
+ | public String eval(String str) {
+ | return str.trim();
+ | }
+ |}
+ |""".stripMargin
+ val tmpDir: File = TEMPORARY_FOLDER.newFolder()
+ val udfJarFile: File = TestUserClassLoaderJar.createJarFile(
+ tmpDir, "flink-test-udf.jar", "TrimUDF", udfJavaCode)
+ val jars: util.List[URL] = util.Collections.singletonList(udfJarFile.toURI.toURL)
+ val cl = ClientUtils.buildUserCodeClassLoader(jars, util.Collections.emptyList(),
+ getClass.getClassLoader, new Configuration())
+ val ctx = TemporaryClassLoaderContext.of(cl)
+ try {
+ tEnv.executeSql("create temporary function trimUDF as 'TrimUDF'")
+ checkResult("select * from MyTable where trimUDF(part1) = 'A' and part2 > 1",
+ Seq(
+ row(3, "Jack", "A", 2, 3)
+ )
+ )
+ } finally {
+ ctx.close()
+ }
+ }
}
object PartitionableSourceITCase {