You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/27 08:19:47 UTC

[GitHub] [flink] lsyldliu commented on a diff in pull request #20176: [FLINK-27660][table] add table api for registering function with resource

lsyldliu commented on code in PR #20176:
URL: https://github.com/apache/flink/pull/20176#discussion_r930660736


##########
flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java:
##########
@@ -104,6 +105,32 @@ public static MutableURLClassLoader create(
         }
     }
 
+    public static MutableURLClassLoader buildUserCodeClassLoader(

Review Comment:
   We don't need to introduce this method, please reuse `public static MutableURLClassLoader create(
               final URL[] urls, final ClassLoader parent, final Configuration configuration)`



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##########
@@ -515,6 +517,95 @@ void createFunction(
             Class<? extends UserDefinedFunction> functionClass,
             boolean ignoreIfExists);
 
+    /**
+     * Registers a {@link UserDefinedFunction} class as a temporary system function by the specific
+     * class name and user defined resource uri.
+     *
+     * <p>Compared to {@link #createTemporarySystemFunction(String, Class)}, this method allow
+     * registering a user defined function by only provide a full path class name and an available
+     * resource which may be local or remote. User doesn't need to initialize the function instance
+     * in advance.
+     *
+     * <p>Temporary functions can shadow permanent ones. If a permanent function under a given name
+     * exists, it will be inaccessible in the current session. To make the permanent function
+     * available again one can drop the corresponding temporary system function.
+     *
+     * @param name The name under which the function will be registered globally.
+     * @param className The class name of UDF to be registered.
+     * @param resourceUris The list of udf resource uris in local or remote.
+     */
+    void createTemporarySystemFunction(
+            String name, String className, List<ResourceUri> resourceUris);
+
+    /**
+     * Registers a {@link UserDefinedFunction} class as a catalog function in the given path by the
+     * specific class name and user defined resource uri.
+     *
+     * <p>Compared to {@link #createFunction(String, Class)}, this method allow registering a user
+     * defined function by only provide a full path class name and an available resource which may
+     * be local or remote. User doesn't need to initialize the function instance in advance.
+     *
+     * <p>Compared to system functions with a globally defined name, catalog functions are always
+     * (implicitly or explicitly) identified by a catalog and database.
+     *
+     * <p>There must not be another function (temporary or permanent) registered under the same
+     * path.
+     *
+     * @param path The path under which the function will be registered. See also the {@link
+     *     TableEnvironment} class description for the format of the path.
+     * @param className The class name of UDF to be registered.
+     * @param resourceUris The list of udf resource uris in local or remote.
+     */
+    void createFunction(String path, String className, List<ResourceUri> resourceUris);
+
+    /**
+     * Registers a {@link UserDefinedFunction} class as a catalog function in the given path by the
+     * specific class name and user defined resource uri.
+     *
+     * <p>Compared to {@link #createFunction(String, Class)}, this method allow registering a user
+     * defined function by only provide a full path class name and an available resource which may
+     * be local or remote. User doesn't need to initialize the function instance in advance.
+     *
+     * <p>Compared to system functions with a globally defined name, catalog functions are always
+     * (implicitly or explicitly) identified by a catalog and database.
+     *
+     * <p>There must not be another function (temporary or permanent) registered under the same
+     * path.
+     *
+     * @param path The path under which the function will be registered. See also the {@link
+     *     TableEnvironment} class description for the format of the path.
+     * @param className The class name of UDF to be registered.
+     * @param resourceUris The list of udf resource uris in local or remote.
+     * @param ignoreIfExists If a function exists under the given path and this flag is set, no
+     *     operation is executed. An exception is thrown otherwise.
+     */
+    void createFunction(

Review Comment:
   Adjusting this method position after `void createFunction(
               String path,
               Class<? extends UserDefinedFunction> functionClass,
               boolean ignoreIfExists);`



##########
flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java:
##########
@@ -142,7 +142,7 @@ private PackagedProgram(
                         ? Collections.emptyList()
                         : extractContainedLibraries(this.jarFile);
         this.userCodeClassLoader =
-                ClientUtils.buildUserCodeClassLoader(

Review Comment:
   Please don't change this.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -434,6 +436,33 @@ public void createFunction(
                 unresolvedIdentifier, functionClass, ignoreIfExists);
     }
 
+    @Override
+    public void createTemporarySystemFunction(
+            String name, String className, List<ResourceUri> resourceUris) {
+        final Class clazz = loadClassFromResource(className, resourceUris);
+        createTemporarySystemFunction(name, clazz);
+    }
+
+    @Override
+    public void createFunction(String path, String className, List<ResourceUri> resourceUris) {
+        createFunction(path, className, resourceUris, false);
+    }
+
+    @Override
+    public void createFunction(
+            String path, String className, List<ResourceUri> resourceUris, boolean ignoreIfExists) {
+        final UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path);
+        final Class clazz = loadClassFromResource(className, resourceUris);
+        functionCatalog.registerCatalogFunction(unresolvedIdentifier, clazz, ignoreIfExists);

Review Comment:
   ```
   functionCatalog.registerCatalogFunction(unresolvedIdentifier, className, resourceUris, ignoreIfExists);
   
   ```
   
   In `FunctionCatalog` we can introduce such methods:
   ```
       public void registerCatalogFunction(
               UnresolvedIdentifier unresolvedIdentifier,
               String className,
               List<ResourceUri> resourceUris,
               boolean ignoreIfExists) {
           final ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
           final ObjectIdentifier normalizedIdentifier =
                   FunctionIdentifier.normalizeObjectIdentifier(identifier);
           registerCatalogFunction(normalizedIdentifier, className, resourceUris, ignoreIfExists);
       }
   ```
   
   ```
   private void registerCatalogFunction(
               ObjectIdentifier normalizedIdentifier,
               String className,
               List<ResourceUri> resourceUris,
               boolean ignoreIfExists) {
           final Catalog catalog =
                   catalogManager
                           .getCatalog(normalizedIdentifier.getCatalogName())
                           .orElseThrow(IllegalStateException::new);
           final ObjectPath path = normalizedIdentifier.toObjectPath();
   
           // we force users to deal with temporary catalog functions first
           if (tempCatalogFunctions.containsKey(normalizedIdentifier)) {
               if (ignoreIfExists) {
                   return;
               }
               throw new ValidationException(
                       String.format(
                               "Could not register catalog function. A temporary function '%s' does already exist. "
                                       + "Please drop the temporary function first.",
                               normalizedIdentifier.asSummaryString()));
           }
   
           if (catalog.functionExists(path)) {
               if (ignoreIfExists) {
                   return;
               }
               throw new ValidationException(
                       String.format(
                               "Could not register catalog function. A function '%s' does already exist.",
                               normalizedIdentifier.asSummaryString()));
           }
   
           final CatalogFunction catalogFunction =
                   new CatalogFunctionImpl(className, FunctionLanguage.JAVA, resourceUris);
           try {
               catalog.createFunction(path, catalogFunction, ignoreIfExists);
           } catch (Throwable t) {
               throw new TableException(
                       String.format(
                               "Could not register catalog function '%s'.",
                               normalizedIdentifier.asSummaryString()),
                       t);
           }
       }
   ```



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala:
##########
@@ -37,21 +37,32 @@ import org.apache.flink.table.planner.utils.{TableTestUtil, TestTableSourceSinks
 import org.apache.flink.table.planner.utils.TableTestUtil.{replaceNodeIdInOperator, replaceStageId, replaceStreamNodeId}
 import org.apache.flink.table.types.DataType
 import org.apache.flink.types.Row
+import org.apache.flink.util.UserClassLoaderJarTestUtils
 
 import _root_.java.util
 import _root_.scala.collection.JavaConverters._
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.sql.SqlExplainLevel
 import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
-import org.junit.{Rule, Test}
+import org.junit.{ClassRule, Rule, Test}
 import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail}
-import org.junit.rules.ExpectedException
+import org.junit.rules.{ExpectedException, TemporaryFolder}
+
+import scala.annotation.meta.getter
 
 class TableEnvironmentTest {
 
+  @(Rule @getter)
+  val tempFolder: TemporaryFolder = new TemporaryFolder()
+
   // used for accurate exception information checking.
   val expectedException: ExpectedException = ExpectedException.none()
 
+  val udfClass = "LowerUDF"
+
+  val udfCode =

Review Comment:
   ditto



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -434,6 +436,33 @@ public void createFunction(
                 unresolvedIdentifier, functionClass, ignoreIfExists);
     }
 
+    @Override
+    public void createTemporarySystemFunction(
+            String name, String className, List<ResourceUri> resourceUris) {
+        final Class clazz = loadClassFromResource(className, resourceUris);
+        createTemporarySystemFunction(name, clazz);
+    }
+
+    @Override
+    public void createFunction(String path, String className, List<ResourceUri> resourceUris) {
+        createFunction(path, className, resourceUris, false);
+    }
+
+    @Override
+    public void createFunction(
+            String path, String className, List<ResourceUri> resourceUris, boolean ignoreIfExists) {
+        final UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path);
+        final Class clazz = loadClassFromResource(className, resourceUris);
+        functionCatalog.registerCatalogFunction(unresolvedIdentifier, clazz, ignoreIfExists);
+    }
+
+    @Override
+    public void createTemporaryFunction(
+            String path, String className, List<ResourceUri> resourceUris) {
+        final Class clazz = loadClassFromResource(className, resourceUris);
+        createTemporarySystemFunction(path, clazz);

Review Comment:
   ```
   functionCatalog.registerTemporaryCatalogFunction(
                   unresolvedIdentifier,
                   new CatalogFunctionImpl(className, FunctionLanguage.JAVA, resourceUris),
                   false);
   ```



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##########
@@ -515,6 +517,95 @@ void createFunction(
             Class<? extends UserDefinedFunction> functionClass,
             boolean ignoreIfExists);
 
+    /**
+     * Registers a {@link UserDefinedFunction} class as a temporary system function by the specific
+     * class name and user defined resource uri.
+     *
+     * <p>Compared to {@link #createTemporarySystemFunction(String, Class)}, this method allow
+     * registering a user defined function by only provide a full path class name and an available
+     * resource which may be local or remote. User doesn't need to initialize the function instance
+     * in advance.
+     *
+     * <p>Temporary functions can shadow permanent ones. If a permanent function under a given name
+     * exists, it will be inaccessible in the current session. To make the permanent function
+     * available again one can drop the corresponding temporary system function.
+     *
+     * @param name The name under which the function will be registered globally.
+     * @param className The class name of UDF to be registered.
+     * @param resourceUris The list of udf resource uris in local or remote.
+     */
+    void createTemporarySystemFunction(
+            String name, String className, List<ResourceUri> resourceUris);
+
+    /**
+     * Registers a {@link UserDefinedFunction} class as a catalog function in the given path by the
+     * specific class name and user defined resource uri.
+     *
+     * <p>Compared to {@link #createFunction(String, Class)}, this method allow registering a user
+     * defined function by only provide a full path class name and an available resource which may
+     * be local or remote. User doesn't need to initialize the function instance in advance.
+     *
+     * <p>Compared to system functions with a globally defined name, catalog functions are always
+     * (implicitly or explicitly) identified by a catalog and database.
+     *
+     * <p>There must not be another function (temporary or permanent) registered under the same
+     * path.
+     *
+     * @param path The path under which the function will be registered. See also the {@link
+     *     TableEnvironment} class description for the format of the path.
+     * @param className The class name of UDF to be registered.
+     * @param resourceUris The list of udf resource uris in local or remote.
+     */
+    void createFunction(String path, String className, List<ResourceUri> resourceUris);

Review Comment:
   Adjusting this method position after `void createFunction(String path, Class<? extends UserDefinedFunction> functionClass);`



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1760,8 +1809,13 @@ private Optional<CatalogBaseTable> getTemporaryTable(ObjectIdentifier identifier
     private TableResultInternal createCatalogFunction(
             CreateCatalogFunctionOperation createCatalogFunctionOperation) {
         String exMsg = getDDLOpExecuteErrorMsg(createCatalogFunctionOperation.asSummaryString());
+        List<ResourceUri> resourceUris =
+                createCatalogFunctionOperation.getCatalogFunction().getFunctionResources();
         try {
             if (createCatalogFunctionOperation.isTemporary()) {
+                if (!resourceUris.isEmpty()) {

Review Comment:
   We don't need to register jar resource here, it will be registered in `FunctionCatalog`, please revert these changes.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##########
@@ -515,6 +517,95 @@ void createFunction(
             Class<? extends UserDefinedFunction> functionClass,
             boolean ignoreIfExists);
 
+    /**
+     * Registers a {@link UserDefinedFunction} class as a temporary system function by the specific
+     * class name and user defined resource uri.
+     *
+     * <p>Compared to {@link #createTemporarySystemFunction(String, Class)}, this method allow
+     * registering a user defined function by only provide a full path class name and an available
+     * resource which may be local or remote. User doesn't need to initialize the function instance
+     * in advance.
+     *
+     * <p>Temporary functions can shadow permanent ones. If a permanent function under a given name
+     * exists, it will be inaccessible in the current session. To make the permanent function
+     * available again one can drop the corresponding temporary system function.
+     *
+     * @param name The name under which the function will be registered globally.
+     * @param className The class name of UDF to be registered.
+     * @param resourceUris The list of udf resource uris in local or remote.
+     */
+    void createTemporarySystemFunction(

Review Comment:
   Adjusting this method position after `void createTemporarySystemFunction(String name, UserDefinedFunction functionInstance);`



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##########
@@ -515,6 +517,95 @@ void createFunction(
             Class<? extends UserDefinedFunction> functionClass,
             boolean ignoreIfExists);
 
+    /**
+     * Registers a {@link UserDefinedFunction} class as a temporary system function by the specific
+     * class name and user defined resource uri.
+     *
+     * <p>Compared to {@link #createTemporarySystemFunction(String, Class)}, this method allow
+     * registering a user defined function by only provide a full path class name and an available
+     * resource which may be local or remote. User doesn't need to initialize the function instance

Review Comment:
    and available resource uri list which may be local or remote



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -434,6 +436,33 @@ public void createFunction(
                 unresolvedIdentifier, functionClass, ignoreIfExists);
     }
 
+    @Override
+    public void createTemporarySystemFunction(

Review Comment:
   I think we should not register the resource and load the class in `TableEnvironment` in advance. Due to `FunctionCatalog` is responsible for register function, so we should let `FunctionCatalog` register resource and load classes uniformly, instead of do this in multiple place. We should try best to reuse these code.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -434,6 +436,33 @@ public void createFunction(
                 unresolvedIdentifier, functionClass, ignoreIfExists);
     }
 
+    @Override
+    public void createTemporarySystemFunction(
+            String name, String className, List<ResourceUri> resourceUris) {
+        final Class clazz = loadClassFromResource(className, resourceUris);
+        createTemporarySystemFunction(name, clazz);

Review Comment:
   Here I think we can call `FunctionCatalog` related method directly:
   
   ```
   functionCatalog.registerTemporarySystemFunction(name, className, FunctionLanguage.JAVA, resourceUris, false);
   ```



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##########
@@ -515,6 +517,95 @@ void createFunction(
             Class<? extends UserDefinedFunction> functionClass,
             boolean ignoreIfExists);
 
+    /**
+     * Registers a {@link UserDefinedFunction} class as a temporary system function by the specific
+     * class name and user defined resource uri.
+     *
+     * <p>Compared to {@link #createTemporarySystemFunction(String, Class)}, this method allow
+     * registering a user defined function by only provide a full path class name and an available
+     * resource which may be local or remote. User doesn't need to initialize the function instance
+     * in advance.
+     *
+     * <p>Temporary functions can shadow permanent ones. If a permanent function under a given name
+     * exists, it will be inaccessible in the current session. To make the permanent function
+     * available again one can drop the corresponding temporary system function.
+     *
+     * @param name The name under which the function will be registered globally.
+     * @param className The class name of UDF to be registered.
+     * @param resourceUris The list of udf resource uris in local or remote.
+     */
+    void createTemporarySystemFunction(
+            String name, String className, List<ResourceUri> resourceUris);
+
+    /**
+     * Registers a {@link UserDefinedFunction} class as a catalog function in the given path by the
+     * specific class name and user defined resource uri.
+     *
+     * <p>Compared to {@link #createFunction(String, Class)}, this method allow registering a user
+     * defined function by only provide a full path class name and an available resource which may
+     * be local or remote. User doesn't need to initialize the function instance in advance.
+     *
+     * <p>Compared to system functions with a globally defined name, catalog functions are always
+     * (implicitly or explicitly) identified by a catalog and database.
+     *
+     * <p>There must not be another function (temporary or permanent) registered under the same
+     * path.
+     *
+     * @param path The path under which the function will be registered. See also the {@link
+     *     TableEnvironment} class description for the format of the path.
+     * @param className The class name of UDF to be registered.
+     * @param resourceUris The list of udf resource uris in local or remote.
+     */
+    void createFunction(String path, String className, List<ResourceUri> resourceUris);
+
+    /**
+     * Registers a {@link UserDefinedFunction} class as a catalog function in the given path by the
+     * specific class name and user defined resource uri.
+     *
+     * <p>Compared to {@link #createFunction(String, Class)}, this method allow registering a user
+     * defined function by only provide a full path class name and an available resource which may
+     * be local or remote. User doesn't need to initialize the function instance in advance.
+     *
+     * <p>Compared to system functions with a globally defined name, catalog functions are always
+     * (implicitly or explicitly) identified by a catalog and database.
+     *
+     * <p>There must not be another function (temporary or permanent) registered under the same
+     * path.
+     *
+     * @param path The path under which the function will be registered. See also the {@link
+     *     TableEnvironment} class description for the format of the path.
+     * @param className The class name of UDF to be registered.
+     * @param resourceUris The list of udf resource uris in local or remote.
+     * @param ignoreIfExists If a function exists under the given path and this flag is set, no
+     *     operation is executed. An exception is thrown otherwise.
+     */
+    void createFunction(
+            String path, String className, List<ResourceUri> resourceUris, boolean ignoreIfExists);
+
+    /**
+     * Registers a {@link UserDefinedFunction} class as a temporary catalog function in the given
+     * path by the specific class name and user defined resource uri.
+     *
+     * <p>Compared to {@link #createTemporaryFunction(String, Class)}, this method allow registering
+     * a user defined function by only provide a full path class name and an available resource uri
+     * which may be local or remote. User doesn't need to initialize the function instance in
+     * advance.
+     *
+     * <p>Compared to {@link #createTemporarySystemFunction(String, String, List)} with a globally
+     * defined name, catalog functions are always (implicitly or explicitly) identified by a catalog
+     * and database.
+     *
+     * <p>Temporary functions can shadow permanent ones. If a permanent function under a given name
+     * exists, it will be inaccessible in the current session. To make the permanent function
+     * available again one can drop the corresponding temporary function.
+     *
+     * @param path The path under which the function will be registered. See also the {@link
+     *     TableEnvironment} class description for the format of the path.
+     * @param className The class name of UDF to be registered.
+     * @param resourceUris The list udf resource uri in local or remote.
+     */
+    void createTemporaryFunction(String path, String className, List<ResourceUri> resourceUris);

Review Comment:
   Adjusting this method position after `void createTemporaryFunction(String path, UserDefinedFunction functionInstance);`



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1847,12 +1907,23 @@ private TableResultInternal dropCatalogFunction(
 
     private TableResultInternal createSystemFunction(CreateTempSystemFunctionOperation operation) {
         String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());
+        List<ResourceUri> resourceUris = operation.getCatalogFunction().getFunctionResources();
+
         try {
+            if (!resourceUris.isEmpty()) {

Review Comment:
   We don't need to register jar resource here, it will be registered in `FunctionCatalog`, please see https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java#L655



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1750,6 +1779,26 @@ public void registerTableSinkInternal(String name, TableSink<?> tableSink) {
         }
     }
 
+    private Class loadClassFromResource(String className, List<ResourceUri> resourceUris) {

Review Comment:
   Revert this method.



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala:
##########
@@ -37,21 +37,32 @@ import org.apache.flink.table.planner.utils.{TableTestUtil, TestTableSourceSinks
 import org.apache.flink.table.planner.utils.TableTestUtil.{replaceNodeIdInOperator, replaceStageId, replaceStreamNodeId}
 import org.apache.flink.table.types.DataType
 import org.apache.flink.types.Row
+import org.apache.flink.util.UserClassLoaderJarTestUtils
 
 import _root_.java.util
 import _root_.scala.collection.JavaConverters._
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.sql.SqlExplainLevel
 import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
-import org.junit.{Rule, Test}
+import org.junit.{ClassRule, Rule, Test}
 import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail}
-import org.junit.rules.ExpectedException
+import org.junit.rules.{ExpectedException, TemporaryFolder}
+
+import scala.annotation.meta.getter
 
 class TableEnvironmentTest {
 
+  @(Rule @getter)
+  val tempFolder: TemporaryFolder = new TemporaryFolder()
+
   // used for accurate exception information checking.
   val expectedException: ExpectedException = ExpectedException.none()
 
+  val udfClass = "LowerUDF"

Review Comment:
   Please reuse related code in `UserDefinedFunctions`.



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala:
##########
@@ -706,6 +717,40 @@ class TableEnvironmentTest {
     assertFalse(tableEnv.listUserDefinedFunctions().contains("f2"))
   }
 
+  @Test
+  def testExecuteSqlWithCreateFunctionWithResource(): Unit = {
+    val userJar = UserClassLoaderJarTestUtils.createJarFile(
+      tempFolder.newFolder("test-jar"),
+      "test-classloader.jar",
+      udfClass,
+      udfCode)
+
+    val tableResult1 = tableEnv.executeSql(

Review Comment:
   I think this test is not correct, the aim of this pr is to support register function by table api, so we should test register function by table api instead of sql. You should add tests in `FunctionITCase`, and test function work in streaming and batch mode, please refer to related test in `FunctionITCase`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org