You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2021/11/09 03:44:47 UTC

[flink] branch release-1.14 updated: [FLINK-24761][table] Fix PartitionPruner code gen compile fail

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new aa6ef87  [FLINK-24761][table] Fix PartitionPruner code gen compile fail
aa6ef87 is described below

commit aa6ef87029ea5f24d831dea0b89c0fa47d9e13ba
Author: zhangmang <zh...@163.com>
AuthorDate: Tue Nov 9 11:44:24 2021 +0800

    [FLINK-24761][table] Fix PartitionPruner code gen compile fail
    
    This closes #17724
---
 .../flink/table/client/cli/CliClientITCase.java    |  8 +++--
 .../client/gateway/context/SessionContextTest.java |  8 +++--
 .../client/gateway/local/LocalExecutorITCase.java  |  8 +++--
 .../client/gateway/utils/UserDefinedFunctions.java | 11 ++++++
 .../table/planner/plan/utils/PartitionPruner.scala |  2 +-
 .../flink/table}/utils/TestUserClassLoaderJar.java | 26 +++++---------
 .../batch/sql/PartitionableSourceITCase.scala      | 41 ++++++++++++++++++++--
 7 files changed, 77 insertions(+), 27 deletions(-)

diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
index 40bb3ac..4292bc9 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
@@ -24,7 +24,8 @@ import org.apache.flink.table.client.cli.utils.TestSqlStatement;
 import org.apache.flink.table.client.gateway.Executor;
 import org.apache.flink.table.client.gateway.context.DefaultContext;
 import org.apache.flink.table.client.gateway.local.LocalExecutor;
-import org.apache.flink.table.client.gateway.utils.TestUserClassLoaderJar;
+import org.apache.flink.table.client.gateway.utils.UserDefinedFunctions;
+import org.apache.flink.table.utils.TestUserClassLoaderJar;
 import org.apache.flink.test.util.AbstractTestBase;
 
 import org.apache.flink.shaded.guava30.com.google.common.io.PatternFilenameFilter;
@@ -98,7 +99,10 @@ public class CliClientITCase extends AbstractTestBase {
     public static void setup() throws IOException {
         File udfJar =
                 TestUserClassLoaderJar.createJarFile(
-                        tempFolder.newFolder("test-jar"), "test-classloader-udf.jar");
+                        tempFolder.newFolder("test-jar"),
+                        "test-classloader-udf.jar",
+                        UserDefinedFunctions.GENERATED_UDF_CLASS,
+                        UserDefinedFunctions.GENERATED_UDF_CODE);
         URL udfDependency = udfJar.toURI().toURL();
         historyPath = tempFolder.newFile("history").toPath();
 
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
index 3fd7074..36dadee 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
@@ -20,7 +20,8 @@ package org.apache.flink.table.client.gateway.context;
 
 import org.apache.flink.client.cli.DefaultCLI;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.client.gateway.utils.TestUserClassLoaderJar;
+import org.apache.flink.table.client.gateway.utils.UserDefinedFunctions;
+import org.apache.flink.table.utils.TestUserClassLoaderJar;
 
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -61,7 +62,10 @@ public class SessionContextTest {
     public static void prepare() throws Exception {
         udfJar =
                 TestUserClassLoaderJar.createJarFile(
-                        tempFolder.newFolder("test-jar"), "test-classloader-udf.jar");
+                        tempFolder.newFolder("test-jar"),
+                        "test-classloader-udf.jar",
+                        UserDefinedFunctions.GENERATED_UDF_CLASS,
+                        UserDefinedFunctions.GENERATED_UDF_CODE);
     }
 
     @Before
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 8e3cc18..d2093a5 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -34,12 +34,13 @@ import org.apache.flink.table.client.gateway.Executor;
 import org.apache.flink.table.client.gateway.ResultDescriptor;
 import org.apache.flink.table.client.gateway.TypedResult;
 import org.apache.flink.table.client.gateway.context.DefaultContext;
-import org.apache.flink.table.client.gateway.utils.TestUserClassLoaderJar;
+import org.apache.flink.table.client.gateway.utils.UserDefinedFunctions;
 import org.apache.flink.table.client.gateway.utils.UserDefinedFunctions.TableUDF;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.utils.TestUserClassLoaderJar;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
@@ -103,7 +104,10 @@ public class LocalExecutorITCase extends TestLogger {
         clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient();
         File udfJar =
                 TestUserClassLoaderJar.createJarFile(
-                        tempFolder.newFolder("test-jar"), "test-classloader-udf.jar");
+                        tempFolder.newFolder("test-jar"),
+                        "test-classloader-udf.jar",
+                        UserDefinedFunctions.GENERATED_UDF_CLASS,
+                        UserDefinedFunctions.GENERATED_UDF_CODE);
         udfDependency = udfJar.toURI().toURL();
     }
 
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
index 94130fc..d0cd826 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
@@ -29,6 +29,17 @@ import org.apache.flink.types.Row;
 /** A bunch of UDFs for testing the SQL Client. */
 public class UserDefinedFunctions {
 
+    public static final String GENERATED_UDF_CLASS = "LowerUDF";
+
+    public static final String GENERATED_UDF_CODE =
+            "public class "
+                    + GENERATED_UDF_CLASS
+                    + " extends org.apache.flink.table.functions.ScalarFunction {\n"
+                    + "  public String eval(String str) {\n"
+                    + "    return str.toLowerCase();\n"
+                    + "  }\n"
+                    + "}\n";
+
     /** The scalar function for SQL Client test. */
     public static class ScalarUDF extends ScalarFunction {
         public String eval(Integer i, Integer offset) {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/PartitionPruner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/PartitionPruner.scala
index 9c397a8..1ed5ac3 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/PartitionPruner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/PartitionPruner.scala
@@ -109,7 +109,7 @@ object PartitionPruner {
       inputType,
       collectorTerm = collectorTerm)
 
-    val function = genFunction.newInstance(getClass.getClassLoader)
+    val function = genFunction.newInstance(Thread.currentThread().getContextClassLoader)
     val richMapFunction = function match {
       case r: RichMapFunction[GenericRowData, Boolean] => r
       case _ => throw new TableException("RichMapFunction[GenericRowData, Boolean] required here")
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestUserClassLoaderJar.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/utils/TestUserClassLoaderJar.java
similarity index 74%
rename from flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestUserClassLoaderJar.java
rename to flink-table/flink-table-planner/src/test/java/org/apache/flink/table/utils/TestUserClassLoaderJar.java
index 921e513..4765f0d 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestUserClassLoaderJar.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/utils/TestUserClassLoaderJar.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.client.gateway.utils;
+package org.apache.flink.table.utils;
 
 import org.apache.flink.util.FileUtils;
 
@@ -34,27 +34,17 @@ import java.util.Collections;
 import java.util.jar.JarEntry;
 import java.util.jar.JarOutputStream;
 
-/** Mainly used for testing classloading of UDF dependencies. */
+/** Mainly used for testing classloading. */
 public class TestUserClassLoaderJar {
 
-    private static final String GENERATED_UDF_CLASS = "LowerUDF";
-
-    private static final String GENERATED_UDF_CODE =
-            "public class "
-                    + GENERATED_UDF_CLASS
-                    + " extends org.apache.flink.table.functions.ScalarFunction {\n"
-                    + "  public String eval(String str) {\n"
-                    + "    return str.toLowerCase();\n"
-                    + "  }\n"
-                    + "}\n";
-
     /** Pack the generated UDF class into a JAR and return the path of the JAR. */
-    public static File createJarFile(File tmpDir, String jarName) throws IOException {
+    public static File createJarFile(File tmpDir, String jarName, String className, String javaCode)
+            throws IOException {
         // write class source code to file
-        File javaFile = Paths.get(tmpDir.toString(), GENERATED_UDF_CLASS + ".java").toFile();
+        File javaFile = Paths.get(tmpDir.toString(), className + ".java").toFile();
         //noinspection ResultOfMethodCallIgnored
         javaFile.createNewFile();
-        FileUtils.writeFileUtf8(javaFile, GENERATED_UDF_CODE);
+        FileUtils.writeFileUtf8(javaFile, javaCode);
 
         // compile class source code
         DiagnosticCollector<JavaFileObject> diagnostics = new DiagnosticCollector<>();
@@ -74,10 +64,10 @@ public class TestUserClassLoaderJar {
         task.call();
 
         // pack class file to jar
-        File classFile = Paths.get(tmpDir.toString(), GENERATED_UDF_CLASS + ".class").toFile();
+        File classFile = Paths.get(tmpDir.toString(), className + ".class").toFile();
         File jarFile = Paths.get(tmpDir.toString(), jarName).toFile();
         JarOutputStream jos = new JarOutputStream(new FileOutputStream(jarFile));
-        JarEntry jarEntry = new JarEntry(GENERATED_UDF_CLASS + ".class");
+        JarEntry jarEntry = new JarEntry(className + ".class");
         jos.putNextEntry(jarEntry);
         byte[] classBytes = FileUtils.readAllBytes(classFile.toPath());
         jos.write(classBytes);
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
index 39653c5..0b71233 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
@@ -18,16 +18,24 @@
 
 package org.apache.flink.table.planner.runtime.batch.sql
 
-import java.util
-
+import org.apache.flink.client.ClientUtils
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.catalog.{CatalogPartitionImpl, CatalogPartitionSpec, ObjectPath}
 import org.apache.flink.table.planner.factories.{TestValuesCatalog, TestValuesTableFactory}
+import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.TEMPORARY_FOLDER
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.utils.TestUserClassLoaderJar
+import org.apache.flink.util.TemporaryClassLoaderContext
+
 import org.junit.{Before, Test}
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
+import java.io.File
+import java.net.URL
+import java.util
+
 import scala.collection.JavaConversions._
 
 @RunWith(classOf[Parameterized])
@@ -120,6 +128,35 @@ class PartitionableSourceITCase(
       )
     )
   }
+
+  @Test
+  def testPartitionPrunerCompileClassLoader(): Unit = {
+    val udfJavaCode =
+      s"""
+         |public class TrimUDF extends org.apache.flink.table.functions.ScalarFunction {
+         |   public String eval(String str) {
+         |     return str.trim();
+         |   }
+         |}
+         |""".stripMargin
+    val tmpDir: File = TEMPORARY_FOLDER.newFolder()
+    val udfJarFile: File = TestUserClassLoaderJar.createJarFile(
+      tmpDir, "flink-test-udf.jar", "TrimUDF", udfJavaCode)
+    val jars: util.List[URL] = util.Collections.singletonList(udfJarFile.toURI.toURL)
+    val cl = ClientUtils.buildUserCodeClassLoader(jars, util.Collections.emptyList(),
+      getClass.getClassLoader, new Configuration())
+    val ctx = TemporaryClassLoaderContext.of(cl)
+    try {
+      tEnv.executeSql("create temporary function trimUDF as 'TrimUDF'")
+      checkResult("select * from MyTable where trimUDF(part1) = 'A' and part2 > 1",
+        Seq(
+          row(3, "Jack", "A", 2, 3)
+        )
+      )
+    } finally {
+      ctx.close()
+    }
+  }
 }
 
 object PartitionableSourceITCase {