You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/04/15 08:20:29 UTC

[GitHub] [flink] WeiZhong94 opened a new pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

WeiZhong94 opened a new pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749
 
 
   ## What is the purpose of the change
   
   *This pull request supports Python UDF in SQL function DDL.*
   
   
   ## Brief change log
   
     - *Refactor `FunctionCatalog` and `TableEnvironment` to instantiate function as late as possible.*
     - *Add 'PYTHON' keyword to sql parser to support Python UDF SQL DDL.*
     - *Add `PythonFunctionFactory` to request the Python UDF metadata from python process.*
   
   
   ## Verifying this change
   This change is already covered by existing tests, such as *FunctionCatalogTest*, *PythonFunctionSqlTest*.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (docs)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409259319
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
 ##########
 @@ -247,4 +257,90 @@ static Process startPythonProcess(PythonEnvironment pythonEnv, List<String> comm
 
 		return process;
 	}
+
+	static void shutdownPythonProcess(Process pythonProcess, long timeoutMillis) {
+		pythonProcess.destroy();
+		try {
+			pythonProcess.waitFor(timeoutMillis, TimeUnit.MILLISECONDS);
+		} catch (InterruptedException e) {
+			throw new RuntimeException("Interrupt while waiting for the python process to stop.", e);
+		}
+		if (pythonProcess.isAlive()) {
+			pythonProcess.destroyForcibly();
+		}
+	}
+
+	private static int findFreePort() throws IOException {
+		ServerSocket socket = new ServerSocket(0);
+		int port = socket.getLocalPort();
+		socket.close();
+		return port;
+	}
+
+	/**
+	 * Creates a GatewayServer run in a daemon thread.
+	 *
+	 * @return The created GatewayServer
+	 */
+	static GatewayServer startGatewayServer() throws ExecutionException, InterruptedException {
+		CompletableFuture<GatewayServer> gatewayServer = new CompletableFuture<>();
+		Thread thread = new Thread(() -> {
+			int freePort;
+			try {
+				freePort = findFreePort();
+			} catch (IOException e) {
+				throw new RuntimeException("Could not find a free port for Py4jCallbackClient.");
+			}
+			GatewayServer server = new GatewayServer.GatewayServerBuilder()
+				.gateway(new Gateway(new ConcurrentHashMap<String, Object>(), new CallbackClient(freePort)))
+				.javaPort(0)
+				.build();
+			gatewayServer.complete(server);
+			server.start(true);
+		});
+		thread.setName("py4j-gateway");
+		thread.setDaemon(true);
+		thread.start();
+		thread.join();
+		return gatewayServer.get();
+	}
+
+	static Process launchPy4jPythonClient(
+		GatewayServer gatewayServer,
+		ReadableConfig config,
+		List<String> commands,
+		String entryPointScript,
+		String tmpDir) throws IOException {
+		PythonEnvUtils.PythonEnvironment pythonEnv = PythonEnvUtils.preparePythonEnvironment(
 
 Review comment:
   Could remove the PythonEnvUtils prefix as the method is also in PythonEnvUtils

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409264137
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/client/python/PythonFunctionFactory.java
 ##########
 @@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.functions.python.PythonFunction;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * The factory which creates the PythonFunction objects from given module name and object name.
+ */
+public interface PythonFunctionFactory {
+
+	/**
+	 * Returns PythonFunction according to moduleName and objectName. The current environment is also
+	 * needed because different environments have different code generation logic.
+	 *
+	 * @param moduleName The module name of the Python UDF.
+	 * @param objectName The function name / class name of the Python UDF.
+	 * @return The PythonFunction object which represents the Python UDF.
+	 */
+	PythonFunction getPythonFunction(String moduleName, String objectName);
+
+	/**
+	 * Returns PythonFunction according to the fully qualified name of the Python UDF
+	 * i.e ${moduleName}.${functionName} or ${moduleName}.${className}. The current environment is also
+	 * needed because different environments have different code generation logic.
 
 Review comment:
   Not quite sure what do you mean `The current environment is also needed because different environments have different code generation logic.`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409973891
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
 ##########
 @@ -259,10 +259,17 @@ private Operation convertCreateFunction(SqlCreateFunction sqlCreateFunction) {
 			return new CreateTempSystemFunctionOperation(
 				unresolvedIdentifier.getObjectName(),
 				sqlCreateFunction.getFunctionClassName().getValueAs(String.class),
-				sqlCreateFunction.isIfNotExists()
+				sqlCreateFunction.isIfNotExists(),
+				parseLanguage(sqlCreateFunction.getFunctionLanguage())
 			);
 		} else {
 			FunctionLanguage language = parseLanguage(sqlCreateFunction.getFunctionLanguage());
+			if (language == FunctionLanguage.PYTHON && !sqlCreateFunction.isTemporary()) {
+				throw new ValidationException(String.format(
 
 Review comment:
   fix the warning here

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409262922
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
 ##########
 @@ -247,4 +257,90 @@ static Process startPythonProcess(PythonEnvironment pythonEnv, List<String> comm
 
 		return process;
 	}
+
+	static void shutdownPythonProcess(Process pythonProcess, long timeoutMillis) {
+		pythonProcess.destroy();
+		try {
+			pythonProcess.waitFor(timeoutMillis, TimeUnit.MILLISECONDS);
+		} catch (InterruptedException e) {
+			throw new RuntimeException("Interrupt while waiting for the python process to stop.", e);
+		}
+		if (pythonProcess.isAlive()) {
+			pythonProcess.destroyForcibly();
+		}
+	}
+
+	private static int findFreePort() throws IOException {
+		ServerSocket socket = new ServerSocket(0);
+		int port = socket.getLocalPort();
+		socket.close();
+		return port;
+	}
+
+	/**
+	 * Creates a GatewayServer run in a daemon thread.
+	 *
+	 * @return The created GatewayServer
+	 */
+	static GatewayServer startGatewayServer() throws ExecutionException, InterruptedException {
+		CompletableFuture<GatewayServer> gatewayServer = new CompletableFuture<>();
+		Thread thread = new Thread(() -> {
+			int freePort;
+			try {
+				freePort = findFreePort();
+			} catch (IOException e) {
+				throw new RuntimeException("Could not find a free port for Py4jCallbackClient.");
+			}
+			GatewayServer server = new GatewayServer.GatewayServerBuilder()
+				.gateway(new Gateway(new ConcurrentHashMap<String, Object>(), new CallbackClient(freePort)))
+				.javaPort(0)
 
 Review comment:
   Just notice that `.javaAddress(localhost)` is removed. What's the reason?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#issuecomment-613901508
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7499",
       "triggerID" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160344550",
       "triggerID" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2603c1936ff28ab745c711224857bba0693edd8",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160553016",
       "triggerID" : "d2603c1936ff28ab745c711224857bba0693edd8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2603c1936ff28ab745c711224857bba0693edd8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7601",
       "triggerID" : "d2603c1936ff28ab745c711224857bba0693edd8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7cbe2380ac7246cf703397d9a78d002c0491a959",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7621",
       "triggerID" : "7cbe2380ac7246cf703397d9a78d002c0491a959",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7cbe2380ac7246cf703397d9a78d002c0491a959",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160664905",
       "triggerID" : "7cbe2380ac7246cf703397d9a78d002c0491a959",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7cbe2380ac7246cf703397d9a78d002c0491a959 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160664905) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7621) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#issuecomment-613901508
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7499",
       "triggerID" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160344550",
       "triggerID" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2603c1936ff28ab745c711224857bba0693edd8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d2603c1936ff28ab745c711224857bba0693edd8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 99a7954f7800e29e024c303e8428e2e9fe4a5ca0 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160344550) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7499) 
   * d2603c1936ff28ab745c711224857bba0693edd8 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409971668
 
 

 ##########
 File path: flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionFactoryTest.java
 ##########
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.flink.python.PythonOptions.PYTHON_FILES;
+
+/**
+ * Tests for PythonFunctionFactory.
+ */
+public class PythonFunctionFactoryTest {
+
+	private String tmpdir = "";
+	private BatchTableEnvironment flinkTableEnv;
+	private StreamTableEnvironment blinkTableEnv;
+	private Table flinkSourceTable;
+	private Table blinkSourceTable;
+
+	@Before
+	public void prepareEnvironment() throws Exception {
+		tmpdir = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString()).getAbsolutePath();
+		new File(tmpdir).mkdir();
+		File pyFilePath = new File(tmpdir, "test1.py");
+		try (OutputStream out = new FileOutputStream(pyFilePath)) {
+			String code = ""
+				+ "from pyflink.table.udf import udf\n"
+				+ "from pyflink.table import DataTypes\n"
+				+ "@udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING())\n"
+				+ "def func1(str):\n"
+				+ "    return str + str\n";
+			out.write(code.getBytes());
+		}
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		flinkTableEnv = BatchTableEnvironment.create(env);
+		flinkTableEnv.getConfig().getConfiguration().set(PYTHON_FILES, pyFilePath.getAbsolutePath());
+		StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+		blinkTableEnv = StreamTableEnvironment.create(
+			sEnv, EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
+		blinkTableEnv.getConfig().getConfiguration().set(PYTHON_FILES, pyFilePath.getAbsolutePath());
+		flinkSourceTable = flinkTableEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str");
+		blinkSourceTable = blinkTableEnv.fromDataStream(sEnv.fromElements("1", "2", "3")).as("str");
+	}
+
+	@After
+	public void cleanEnvironment() throws Exception {
+		FileUtils.deleteDirectory(new File(tmpdir));
+	}
+
+	@Test
+	public void testPythonFunctionFactory() {
+		// flink temporary catalog
+		flinkTableEnv.sqlUpdate("create temporary function func1 as 'test1.func1' language python");
+		verifyPlan(flinkSourceTable.select("func1(str)"), flinkTableEnv);
+
+		// flink temporary system
+		flinkTableEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python");
+		verifyPlan(flinkSourceTable.select("func1(str)"), flinkTableEnv);
+
+		// blink temporary catalog
+		blinkTableEnv.sqlUpdate("create temporary function func1 as 'test1.func1' language python");
+		verifyPlan(blinkSourceTable.select("func1(str)"), blinkTableEnv);
+
+		// blink temporary system
+		blinkTableEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python");
+		verifyPlan(blinkSourceTable.select("func1(str)"), blinkTableEnv);
+	}
+
+	private void verifyPlan(Table table, TableEnvironment tableEnvironment) {
+		String plan = tableEnvironment.explain(table);
+		Assert.assertTrue(plan.contains("PythonCalc(select=[func1(f0) AS _c0])"));
+	}
+
+	@BeforeClass
 
 Review comment:
   move this method to the top of the class?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409258531
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
 ##########
 @@ -50,8 +58,8 @@
 /**
  * The util class help to prepare Python env and run the python process.
  */
-final class PythonDriverEnvUtils {
-	private static final Logger LOG = LoggerFactory.getLogger(PythonDriverEnvUtils.class);
+final class PythonEnvUtils {
+	private static final Logger LOG = LoggerFactory.getLogger(PythonDriverEnvUtils.class);;
 
 Review comment:
   PythonDriverEnvUtils -> PythonEnvUtils

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409959350
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
 ##########
 @@ -247,4 +256,63 @@ static Process startPythonProcess(PythonEnvironment pythonEnv, List<String> comm
 
 		return process;
 	}
+
+	/**
+	 * Py4J both supports Java to Python RPC and Python to Java RPC. The GatewayServer object is
+	 * the entry point of Java to Python RPC. Since the Py4j Python client will only be launched
+	 * only once, the GatewayServer object needs to be reused.
+	 */
+	private static GatewayServer gatewayServer = null;
+
+	/**
+	 * Creates a GatewayServer run in a daemon thread.
+	 *
+	 * @return The created GatewayServer
+	 */
+	static GatewayServer startGatewayServer() throws ExecutionException, InterruptedException {
+		if (gatewayServer != null) {
+			return gatewayServer;
+		}
+		CompletableFuture<GatewayServer> gatewayServerFuture = new CompletableFuture<>();
+		Thread thread = new Thread(() -> {
+			int freePort = NetUtils.getAvailablePort();
+			GatewayServer server = new GatewayServer.GatewayServerBuilder()
+				.gateway(new Gateway(new ConcurrentHashMap<String, Object>(), new CallbackClient(freePort)))
+				.javaPort(0)
+				.build();
+			gatewayServerFuture.complete(server);
+			server.start(true);
+		});
+		thread.setName("py4j-gateway");
+		thread.setDaemon(true);
+		thread.start();
+		thread.join();
+		gatewayServer = gatewayServerFuture.get();
+		return gatewayServer;
+	}
+
+	static Process launchPy4jPythonClient(
+			GatewayServer gatewayServer,
+			ReadableConfig config,
+			List<String> commands,
+			String entryPointScript,
+			String tmpDir) throws IOException {
+		PythonEnvironment pythonEnv = PythonEnvUtils.preparePythonEnvironment(
+			config, entryPointScript, tmpDir);
+		// set env variable PYFLINK_GATEWAY_PORT for connecting of python gateway in python process.
+		pythonEnv.systemEnv.put("PYFLINK_GATEWAY_PORT", String.valueOf(gatewayServer.getListeningPort()));
+		// set env variable PYFLINK_CALLBACK_PORT for creating callback server in python process.
+		pythonEnv.systemEnv.put("PYFLINK_CALLBACK_PORT", String.valueOf(gatewayServer.getCallbackClient().getPort()));
+		// start the python process.
+		return PythonEnvUtils.startPythonProcess(pythonEnv, commands);
+	}
+
+	static GatewayServer getGatewayServer() {
+		return gatewayServer;
+	}
+
+	static void removeGatewayServer() {
+		gatewayServer.shutdown();
 
 Review comment:
   Should check if gatewayServer is null

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409972150
 
 

 ##########
 File path: flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionFactoryTest.java
 ##########
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.flink.python.PythonOptions.PYTHON_FILES;
+
+/**
+ * Tests for PythonFunctionFactory.
+ */
+public class PythonFunctionFactoryTest {
+
+	private String tmpdir = "";
+	private BatchTableEnvironment flinkTableEnv;
+	private StreamTableEnvironment blinkTableEnv;
+	private Table flinkSourceTable;
+	private Table blinkSourceTable;
+
+	@Before
 
 Review comment:
   Change to BeforeClass?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#issuecomment-613901508
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7499",
       "triggerID" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160344550",
       "triggerID" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 99a7954f7800e29e024c303e8428e2e9fe4a5ca0 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160344550) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7499) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409280559
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 ##########
 @@ -945,73 +941,6 @@ private void dropSystemFunction(DropTempSystemFunctionOperation operation) {
 		}
 	}
 
-	private <T, ACC> void  registerCatalogFunctionInFunctionCatalog(
 
 Review comment:
   Could you split this into a separate commit?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409276883
 
 

 ##########
 File path: flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionSqlTest.java
 ##########
 @@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.UUID;
+
+import static org.apache.flink.python.PythonOptions.PYTHON_FILES;
+
+/**
+ * Tests for PythonFunctionSqlTest.
+ */
+public class PythonFunctionSqlTest {
+
+	private String tmpdir = "";
+	private BatchTableEnvironment flinkTableEnv;
+	private StreamTableEnvironment blinkTableEnv;
+	private Table flinkSourceTable;
+	private Table blinkSourceTable;
+
+	private void closeStartedPythonProcess() {
+		synchronized (PythonFunctionFactoryUtil.class) {
+			if (PythonFunctionFactoryUtil.pythonProcessShutdownHook != null) {
+				PythonFunctionFactoryUtil.pythonProcessShutdownHook.run();
+				Runtime.getRuntime().removeShutdownHook(PythonFunctionFactoryUtil.pythonProcessShutdownHook);
+				PythonFunctionFactoryUtil.pythonProcessShutdownHook = null;
+			}
+		}
+	}
+
+	@Before
+	public void prepareEnvironment() throws IOException {
+		closeStartedPythonProcess();
 
 Review comment:
   Could we move closeStartedPythonProcess to a method marked as `@AfterClass`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409973118
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java
 ##########
 @@ -33,8 +54,36 @@ public static FunctionDefinition createFunctionDefinition(String name, String cl
 				String.format("Failed instantiating '%s'", className), e);
 		}
 
+		return createFunctionDefinitionInternal(name, (UserDefinedFunction) func);
+	}
+
+	private static FunctionDefinition createPythonFunctionDefinition(
+			String name,
+			String fullyQualifiedName,
+			ReadableConfig config) {
+		Object func;
+		try {
+			Class pythonFunctionFactory = Class.forName(
+				"org.apache.flink.client.python.PythonFunctionFactory",
+				true,
+				Thread.currentThread().getContextClassLoader());
+			func = pythonFunctionFactory.getMethod(
+				"getPythonFunction",
+				String.class,
+				ReadableConfig.class)
+				.invoke(null, fullyQualifiedName, config);
+
 
 Review comment:
   remove the empty line

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409262193
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
 ##########
 @@ -247,4 +257,90 @@ static Process startPythonProcess(PythonEnvironment pythonEnv, List<String> comm
 
 		return process;
 	}
+
+	static void shutdownPythonProcess(Process pythonProcess, long timeoutMillis) {
+		pythonProcess.destroy();
+		try {
+			pythonProcess.waitFor(timeoutMillis, TimeUnit.MILLISECONDS);
+		} catch (InterruptedException e) {
+			throw new RuntimeException("Interrupt while waiting for the python process to stop.", e);
+		}
+		if (pythonProcess.isAlive()) {
+			pythonProcess.destroyForcibly();
+		}
+	}
+
+	private static int findFreePort() throws IOException {
+		ServerSocket socket = new ServerSocket(0);
+		int port = socket.getLocalPort();
+		socket.close();
+		return port;
+	}
+
+	/**
+	 * Creates a GatewayServer run in a daemon thread.
+	 *
+	 * @return The created GatewayServer
+	 */
+	static GatewayServer startGatewayServer() throws ExecutionException, InterruptedException {
+		CompletableFuture<GatewayServer> gatewayServer = new CompletableFuture<>();
 
 Review comment:
   Rename to gatewayServerFuture

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#issuecomment-613901508
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7499",
       "triggerID" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160344550",
       "triggerID" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 99a7954f7800e29e024c303e8428e2e9fe4a5ca0 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160344550) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7499) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409277200
 
 

 ##########
 File path: flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionSqlTest.java
 ##########
 @@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.UUID;
+
+import static org.apache.flink.python.PythonOptions.PYTHON_FILES;
+
+/**
+ * Tests for PythonFunctionSqlTest.
+ */
+public class PythonFunctionSqlTest {
 
 Review comment:
   Rename it to PythonFunctionFactoryTest

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409259043
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
 ##########
 @@ -247,4 +257,90 @@ static Process startPythonProcess(PythonEnvironment pythonEnv, List<String> comm
 
 		return process;
 	}
+
+	static void shutdownPythonProcess(Process pythonProcess, long timeoutMillis) {
+		pythonProcess.destroy();
+		try {
+			pythonProcess.waitFor(timeoutMillis, TimeUnit.MILLISECONDS);
+		} catch (InterruptedException e) {
+			throw new RuntimeException("Interrupt while waiting for the python process to stop.", e);
+		}
+		if (pythonProcess.isAlive()) {
+			pythonProcess.destroyForcibly();
+		}
+	}
+
+	private static int findFreePort() throws IOException {
+		ServerSocket socket = new ServerSocket(0);
+		int port = socket.getLocalPort();
+		socket.close();
+		return port;
+	}
+
+	/**
+	 * Creates a GatewayServer run in a daemon thread.
+	 *
+	 * @return The created GatewayServer
+	 */
+	static GatewayServer startGatewayServer() throws ExecutionException, InterruptedException {
+		CompletableFuture<GatewayServer> gatewayServer = new CompletableFuture<>();
+		Thread thread = new Thread(() -> {
+			int freePort;
+			try {
+				freePort = findFreePort();
+			} catch (IOException e) {
+				throw new RuntimeException("Could not find a free port for Py4jCallbackClient.");
+			}
+			GatewayServer server = new GatewayServer.GatewayServerBuilder()
+				.gateway(new Gateway(new ConcurrentHashMap<String, Object>(), new CallbackClient(freePort)))
+				.javaPort(0)
+				.build();
+			gatewayServer.complete(server);
+			server.start(true);
+		});
+		thread.setName("py4j-gateway");
+		thread.setDaemon(true);
+		thread.start();
+		thread.join();
+		return gatewayServer.get();
+	}
+
+	static Process launchPy4jPythonClient(
+		GatewayServer gatewayServer,
+		ReadableConfig config,
+		List<String> commands,
+		String entryPointScript,
+		String tmpDir) throws IOException {
+		PythonEnvUtils.PythonEnvironment pythonEnv = PythonEnvUtils.preparePythonEnvironment(
+			config, entryPointScript, tmpDir);
+		// set env variable PYFLINK_GATEWAY_PORT for connecting of python gateway in python process.
+		pythonEnv.systemEnv.put("PYFLINK_GATEWAY_PORT", String.valueOf(gatewayServer.getListeningPort()));
+		// set env variable PYFLINK_CALLBACK_PORT for creating callback server in python process.
+		pythonEnv.systemEnv.put("PYFLINK_CALLBACK_PORT", String.valueOf(gatewayServer.getCallbackClient().getPort()));
+		// start the python process.
+		return PythonEnvUtils.startPythonProcess(pythonEnv, commands);
+	}
+
+	static Process launchPy4jPythonClient(
 
 Review comment:
   remove this method

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409258002
 
 

 ##########
 File path: flink-python/pyflink/table/tests/test_table_environment_api.py
 ##########
 @@ -329,6 +329,13 @@ def test_table_environment_with_blink_planner(self):
 
         self.assert_equals(results, ['2,hi,hello\n', '3,hello,hello\n'])
 
+    def test_sql_ddl(self):
+        self.t_env.sql_update("create temporary function func1 as "
+                              "'pyflink.table.tests.test_udf.add' language python")
+        table = self.t_env.from_elements([(1, 2)]).alias("a, b").select("func1(a, b)")
+        plan = self.t_env.explain(table)
+        self.assertTrue(plan.find("DataStreamPythonCalc(select=[add(f0, f1) AS _c0])") >= 0)
+
 
 Review comment:
   Change to `plan.find("PythonCalc(xxx)")` and then the test case `test_sql_ddl` could be shared between batch and stream?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#issuecomment-613901508
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7499",
       "triggerID" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160344550",
       "triggerID" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2603c1936ff28ab745c711224857bba0693edd8",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160553016",
       "triggerID" : "d2603c1936ff28ab745c711224857bba0693edd8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2603c1936ff28ab745c711224857bba0693edd8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7601",
       "triggerID" : "d2603c1936ff28ab745c711224857bba0693edd8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7cbe2380ac7246cf703397d9a78d002c0491a959",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7621",
       "triggerID" : "7cbe2380ac7246cf703397d9a78d002c0491a959",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7cbe2380ac7246cf703397d9a78d002c0491a959",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160664905",
       "triggerID" : "7cbe2380ac7246cf703397d9a78d002c0491a959",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d2603c1936ff28ab745c711224857bba0693edd8 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160553016) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7601) 
   * 7cbe2380ac7246cf703397d9a78d002c0491a959 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160664905) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7621) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409265616
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/client/python/PythonFunctionFactoryUtil.java
 ##########
 @@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+
+import py4j.GatewayServer;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.client.python.PythonEnvUtils.getGatewayServer;
+import static org.apache.flink.client.python.PythonEnvUtils.launchPy4jPythonClient;
+import static org.apache.flink.client.python.PythonEnvUtils.setGatewayServer;
+import static org.apache.flink.client.python.PythonEnvUtils.shutdownPythonProcess;
+
+/**
+ * Utils for PythonFunctionFactory.
+ */
+public class PythonFunctionFactoryUtil {
+
+	private static final long CHECK_INTERVAL = 100;
+
+	private static final long TIMEOUT_MILLIS = 3000;
+
+	private static PythonFunctionFactory pythonFunctionFactory = null;
+
+	@VisibleForTesting
+	static Thread pythonProcessShutdownHook = null;
+
+	public static synchronized PythonFunctionFactory getPythonFunctionFactory(ReadableConfig config)
+			throws IOException, ExecutionException, InterruptedException {
+		if (pythonFunctionFactory != null) {
+			return pythonFunctionFactory;
+		} else {
+			if (getGatewayServer() == null) {
+				GatewayServer gatewayServer = PythonEnvUtils.startGatewayServer();
+				List<String> commands = new ArrayList<>();
+				commands.add("-m");
+				commands.add("pyflink.java_rpc_server");
+				String tmpDir = System.getProperty("java.io.tmpdir") +
+					File.separator + "pyflink" + File.separator + UUID.randomUUID();
+				Process pythonProcess = launchPy4jPythonClient(gatewayServer, config, commands, tmpDir);
+
+				Map<String, Object> entryPoint = (Map<String, Object>) gatewayServer.getGateway().getEntryPoint();
+				int i = 0;
+				try {
+					while (!entryPoint.containsKey("PythonFunctionFactory")) {
+						if (!pythonProcess.isAlive()) {
+							throw new RuntimeException("Python process environment start failed!");
+						}
+						try {
+							Thread.sleep(CHECK_INTERVAL);
+						} catch (InterruptedException e) {
+							throw new RuntimeException("Interrupt while waiting for the python process to start.", e);
 
 Review comment:
   Interrupt -> Interrupted

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#issuecomment-613901508
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7499",
       "triggerID" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160344550",
       "triggerID" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2603c1936ff28ab745c711224857bba0693edd8",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160553016",
       "triggerID" : "d2603c1936ff28ab745c711224857bba0693edd8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2603c1936ff28ab745c711224857bba0693edd8",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7601",
       "triggerID" : "d2603c1936ff28ab745c711224857bba0693edd8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 99a7954f7800e29e024c303e8428e2e9fe4a5ca0 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160344550) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7499) 
   * d2603c1936ff28ab745c711224857bba0693edd8 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160553016) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7601) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#issuecomment-613901508
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 99a7954f7800e29e024c303e8428e2e9fe4a5ca0 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#issuecomment-613901508
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7499",
       "triggerID" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160344550",
       "triggerID" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2603c1936ff28ab745c711224857bba0693edd8",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160553016",
       "triggerID" : "d2603c1936ff28ab745c711224857bba0693edd8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2603c1936ff28ab745c711224857bba0693edd8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7601",
       "triggerID" : "d2603c1936ff28ab745c711224857bba0693edd8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7cbe2380ac7246cf703397d9a78d002c0491a959",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7cbe2380ac7246cf703397d9a78d002c0491a959",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d2603c1936ff28ab745c711224857bba0693edd8 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160553016) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7601) 
   * 7cbe2380ac7246cf703397d9a78d002c0491a959 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409262680
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
 ##########
 @@ -247,4 +257,90 @@ static Process startPythonProcess(PythonEnvironment pythonEnv, List<String> comm
 
 		return process;
 	}
+
+	static void shutdownPythonProcess(Process pythonProcess, long timeoutMillis) {
+		pythonProcess.destroy();
+		try {
+			pythonProcess.waitFor(timeoutMillis, TimeUnit.MILLISECONDS);
+		} catch (InterruptedException e) {
+			throw new RuntimeException("Interrupt while waiting for the python process to stop.", e);
+		}
+		if (pythonProcess.isAlive()) {
+			pythonProcess.destroyForcibly();
+		}
+	}
+
+	private static int findFreePort() throws IOException {
 
 Review comment:
   Using NetUtils.getAvailablePort instead

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] WeiZhong94 commented on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
WeiZhong94 commented on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#issuecomment-614637648
 
 
   @dianfu Thanks for your review! I have addressed your comments in the latest commits.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] WeiZhong94 commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
WeiZhong94 commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409535114
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
 ##########
 @@ -247,4 +257,90 @@ static Process startPythonProcess(PythonEnvironment pythonEnv, List<String> comm
 
 		return process;
 	}
+
+	static void shutdownPythonProcess(Process pythonProcess, long timeoutMillis) {
+		pythonProcess.destroy();
+		try {
+			pythonProcess.waitFor(timeoutMillis, TimeUnit.MILLISECONDS);
+		} catch (InterruptedException e) {
+			throw new RuntimeException("Interrupt while waiting for the python process to stop.", e);
+		}
+		if (pythonProcess.isAlive()) {
+			pythonProcess.destroyForcibly();
+		}
+	}
+
+	private static int findFreePort() throws IOException {
+		ServerSocket socket = new ServerSocket(0);
+		int port = socket.getLocalPort();
+		socket.close();
+		return port;
+	}
+
+	/**
+	 * Creates a GatewayServer run in a daemon thread.
+	 *
+	 * @return The created GatewayServer
+	 */
+	static GatewayServer startGatewayServer() throws ExecutionException, InterruptedException {
+		CompletableFuture<GatewayServer> gatewayServer = new CompletableFuture<>();
+		Thread thread = new Thread(() -> {
+			int freePort;
+			try {
+				freePort = findFreePort();
+			} catch (IOException e) {
+				throw new RuntimeException("Could not find a free port for Py4jCallbackClient.");
+			}
+			GatewayServer server = new GatewayServer.GatewayServerBuilder()
+				.gateway(new Gateway(new ConcurrentHashMap<String, Object>(), new CallbackClient(freePort)))
+				.javaPort(0)
 
 Review comment:
   Because the loopback address is the default value of the `GatewayServer`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409276408
 
 

 ##########
 File path: flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionSqlTest.java
 ##########
 @@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.UUID;
+
+import static org.apache.flink.python.PythonOptions.PYTHON_FILES;
+
+/**
+ * Tests for PythonFunctionSqlTest.
 
 Review comment:
   correct the comments.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409970315
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/client/python/PythonGatewayServer.java
 ##########
 @@ -82,4 +81,8 @@ public static void main(String[] args) throws IOException {
 			System.exit(1);
 		}
 	}
+
+	public static boolean ping() {
 
 Review comment:
   this method is not needed any more and should be removed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409261220
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
 ##########
 @@ -247,4 +257,90 @@ static Process startPythonProcess(PythonEnvironment pythonEnv, List<String> comm
 
 		return process;
 	}
+
+	static void shutdownPythonProcess(Process pythonProcess, long timeoutMillis) {
 
 Review comment:
   This method seems very common and is not specific for Python. What about move it to PythonFunctionFactoryUtil where it's used?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#issuecomment-613897747
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 99a7954f7800e29e024c303e8428e2e9fe4a5ca0 (Wed Apr 15 08:30:12 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409262374
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
 ##########
 @@ -247,4 +257,90 @@ static Process startPythonProcess(PythonEnvironment pythonEnv, List<String> comm
 
 		return process;
 	}
+
+	static void shutdownPythonProcess(Process pythonProcess, long timeoutMillis) {
+		pythonProcess.destroy();
+		try {
+			pythonProcess.waitFor(timeoutMillis, TimeUnit.MILLISECONDS);
+		} catch (InterruptedException e) {
+			throw new RuntimeException("Interrupt while waiting for the python process to stop.", e);
+		}
+		if (pythonProcess.isAlive()) {
+			pythonProcess.destroyForcibly();
+		}
+	}
+
+	private static int findFreePort() throws IOException {
+		ServerSocket socket = new ServerSocket(0);
+		int port = socket.getLocalPort();
+		socket.close();
+		return port;
+	}
+
+	/**
+	 * Creates a GatewayServer run in a daemon thread.
+	 *
+	 * @return The created GatewayServer
+	 */
+	static GatewayServer startGatewayServer() throws ExecutionException, InterruptedException {
+		CompletableFuture<GatewayServer> gatewayServer = new CompletableFuture<>();
+		Thread thread = new Thread(() -> {
+			int freePort;
+			try {
+				freePort = findFreePort();
+			} catch (IOException e) {
+				throw new RuntimeException("Could not find a free port for Py4jCallbackClient.");
 
 Review comment:
   We should catch the exception and complete gatewayServerFuture using `completeExceptionally`when encountering exceptions.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409260255
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
 ##########
 @@ -67,20 +69,22 @@ public static void main(String[] args) {
 		Configuration config = ExecutionEnvironment.getExecutionEnvironment().getConfiguration();
 
 		// start gateway server
-		GatewayServer gatewayServer = startGatewayServer();
+		GatewayServer gatewayServer = PythonEnvUtils.startGatewayServer();
 
 		// commands which will be exec in python progress.
 		final List<String> commands = constructPythonCommands(pythonDriverOptions);
 		try {
 			// prepare the exec environment of python progress.
 			String tmpDir = System.getProperty("java.io.tmpdir") +
 				File.separator + "pyflink" + File.separator + UUID.randomUUID();
-			PythonDriverEnvUtils.PythonEnvironment pythonEnv = PythonDriverEnvUtils.preparePythonEnvironment(
-				config, pythonDriverOptions.getEntryPointScript().orElse(null), tmpDir);
-			// set env variable PYFLINK_GATEWAY_PORT for connecting of python gateway in python progress.
-			pythonEnv.systemEnv.put("PYFLINK_GATEWAY_PORT", String.valueOf(gatewayServer.getListeningPort()));
 			// start the python process.
-			Process pythonProcess = PythonDriverEnvUtils.startPythonProcess(pythonEnv, commands);
+			Process pythonProcess = PythonEnvUtils.launchPy4jPythonClient(
+				gatewayServer,
+				config,
+				commands,
+				pythonDriverOptions.getEntryPointScript().orElse(null),
+				tmpDir);
+			setGatewayServer(gatewayServer);
 
 Review comment:
   There is no need to set the gatewayServer to a static variable in PythonDriver. What do you think?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409263779
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/client/python/PythonFunctionFactory.java
 ##########
 @@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.functions.python.PythonFunction;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * The factory which creates the PythonFunction objects from given module name and object name.
+ */
+public interface PythonFunctionFactory {
+
+	/**
+	 * Returns PythonFunction according to moduleName and objectName. The current environment is also
+	 * needed because different environments have different code generation logic.
+	 *
+	 * @param moduleName The module name of the Python UDF.
+	 * @param objectName The function name / class name of the Python UDF.
+	 * @return The PythonFunction object which represents the Python UDF.
+	 */
+	PythonFunction getPythonFunction(String moduleName, String objectName);
+
+	/**
+	 * Returns PythonFunction according to the fully qualified name of the Python UDF
+	 * i.e ${moduleName}.${functionName} or ${moduleName}.${className}. The current environment is also
+	 * needed because different environments have different code generation logic.
+	 *
+	 * @param fullyQualifiedName The fully qualified name of the Python UDF.
+	 * @param config The configuration of python dependencies.
+	 * @return The PythonFunction object which represents the Python UDF.
+	 */
+	static PythonFunction getPythonFunction(String fullyQualifiedName, ReadableConfig config)
+		throws IOException, ExecutionException, InterruptedException {
+		int splitIndex = fullyQualifiedName.lastIndexOf(".");
+		String moduleName = fullyQualifiedName.substring(0, splitIndex);
 
 Review comment:
   What happens if there is no `.` in the fullyQualifiedName?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409253106
 
 

 ##########
 File path: flink-python/pyflink/java_rpc_server.py
 ##########
 @@ -0,0 +1,32 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import time
+
+from pyflink.java_gateway import get_gateway
+
+
+if __name__ == '__main__':
+    # just a daemon process used to serve the rpc call from Java.
+    gateway = get_gateway()
+    try:
+        # a simple watch dog
+        while gateway.jvm.PythonGatewayServer.ping():
+            time.sleep(10000)
+    finally:
+        get_gateway().close()
 
 Review comment:
   get_gateway() -> gateway

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] WeiZhong94 commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
WeiZhong94 commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409535801
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/client/python/PythonFunctionFactoryUtil.java
 ##########
 @@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+
+import py4j.GatewayServer;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.client.python.PythonEnvUtils.getGatewayServer;
+import static org.apache.flink.client.python.PythonEnvUtils.launchPy4jPythonClient;
+import static org.apache.flink.client.python.PythonEnvUtils.setGatewayServer;
+import static org.apache.flink.client.python.PythonEnvUtils.shutdownPythonProcess;
+
+/**
+ * Utils for PythonFunctionFactory.
+ */
+public class PythonFunctionFactoryUtil {
+
+	private static final long CHECK_INTERVAL = 100;
+
+	private static final long TIMEOUT_MILLIS = 3000;
+
+	private static PythonFunctionFactory pythonFunctionFactory = null;
+
+	@VisibleForTesting
+	static Thread pythonProcessShutdownHook = null;
+
+	public static synchronized PythonFunctionFactory getPythonFunctionFactory(ReadableConfig config)
+			throws IOException, ExecutionException, InterruptedException {
+		if (pythonFunctionFactory != null) {
+			return pythonFunctionFactory;
+		} else {
+			if (getGatewayServer() == null) {
 
 Review comment:
   If the entry point of the Java process is `PythonDriver` or `PythonGatewayServer`, it is possible to hit this situation.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409266063
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/client/python/PythonFunctionFactoryUtil.java
 ##########
 @@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+
+import py4j.GatewayServer;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.client.python.PythonEnvUtils.getGatewayServer;
+import static org.apache.flink.client.python.PythonEnvUtils.launchPy4jPythonClient;
+import static org.apache.flink.client.python.PythonEnvUtils.setGatewayServer;
+import static org.apache.flink.client.python.PythonEnvUtils.shutdownPythonProcess;
+
+/**
+ * Utils for PythonFunctionFactory.
+ */
+public class PythonFunctionFactoryUtil {
+
+	private static final long CHECK_INTERVAL = 100;
+
+	private static final long TIMEOUT_MILLIS = 3000;
 
 Review comment:
   Seems to small for me, change to 10000?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] WeiZhong94 commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
WeiZhong94 commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409534525
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
 ##########
 @@ -67,20 +69,22 @@ public static void main(String[] args) {
 		Configuration config = ExecutionEnvironment.getExecutionEnvironment().getConfiguration();
 
 		// start gateway server
-		GatewayServer gatewayServer = startGatewayServer();
+		GatewayServer gatewayServer = PythonEnvUtils.startGatewayServer();
 
 		// commands which will be exec in python progress.
 		final List<String> commands = constructPythonCommands(pythonDriverOptions);
 		try {
 			// prepare the exec environment of python progress.
 			String tmpDir = System.getProperty("java.io.tmpdir") +
 				File.separator + "pyflink" + File.separator + UUID.randomUUID();
-			PythonDriverEnvUtils.PythonEnvironment pythonEnv = PythonDriverEnvUtils.preparePythonEnvironment(
-				config, pythonDriverOptions.getEntryPointScript().orElse(null), tmpDir);
-			// set env variable PYFLINK_GATEWAY_PORT for connecting of python gateway in python progress.
-			pythonEnv.systemEnv.put("PYFLINK_GATEWAY_PORT", String.valueOf(gatewayServer.getListeningPort()));
 			// start the python process.
-			Process pythonProcess = PythonDriverEnvUtils.startPythonProcess(pythonEnv, commands);
+			Process pythonProcess = PythonEnvUtils.launchPy4jPythonClient(
+				gatewayServer,
+				config,
+				commands,
+				pythonDriverOptions.getEntryPointScript().orElse(null),
+				tmpDir);
+			setGatewayServer(gatewayServer);
 
 Review comment:
   As the `PythonDriver`, `PythonGatewayServer`, `PythonFunctionFactory` will share the same gatewayServer object, it is necessary to set the gatewayServer to a static variable here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409265928
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/client/python/PythonFunctionFactoryUtil.java
 ##########
 @@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+
+import py4j.GatewayServer;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.client.python.PythonEnvUtils.getGatewayServer;
+import static org.apache.flink.client.python.PythonEnvUtils.launchPy4jPythonClient;
+import static org.apache.flink.client.python.PythonEnvUtils.setGatewayServer;
+import static org.apache.flink.client.python.PythonEnvUtils.shutdownPythonProcess;
+
+/**
+ * Utils for PythonFunctionFactory.
+ */
+public class PythonFunctionFactoryUtil {
+
+	private static final long CHECK_INTERVAL = 100;
+
+	private static final long TIMEOUT_MILLIS = 3000;
+
+	private static PythonFunctionFactory pythonFunctionFactory = null;
+
+	@VisibleForTesting
+	static Thread pythonProcessShutdownHook = null;
+
+	public static synchronized PythonFunctionFactory getPythonFunctionFactory(ReadableConfig config)
+			throws IOException, ExecutionException, InterruptedException {
+		if (pythonFunctionFactory != null) {
+			return pythonFunctionFactory;
+		} else {
+			if (getGatewayServer() == null) {
+				GatewayServer gatewayServer = PythonEnvUtils.startGatewayServer();
+				List<String> commands = new ArrayList<>();
+				commands.add("-m");
+				commands.add("pyflink.java_rpc_server");
+				String tmpDir = System.getProperty("java.io.tmpdir") +
+					File.separator + "pyflink" + File.separator + UUID.randomUUID();
+				Process pythonProcess = launchPy4jPythonClient(gatewayServer, config, commands, tmpDir);
+
+				Map<String, Object> entryPoint = (Map<String, Object>) gatewayServer.getGateway().getEntryPoint();
+				int i = 0;
+				try {
+					while (!entryPoint.containsKey("PythonFunctionFactory")) {
+						if (!pythonProcess.isAlive()) {
+							throw new RuntimeException("Python process environment start failed!");
 
 Review comment:
   Python process environment -> Python callback server

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409970974
 
 

 ##########
 File path: flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionFactoryTest.java
 ##########
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.flink.python.PythonOptions.PYTHON_FILES;
+
+/**
+ * Tests for PythonFunctionFactory.
+ */
+public class PythonFunctionFactoryTest {
+
+	private String tmpdir = "";
+	private BatchTableEnvironment flinkTableEnv;
+	private StreamTableEnvironment blinkTableEnv;
+	private Table flinkSourceTable;
+	private Table blinkSourceTable;
+
+	@Before
+	public void prepareEnvironment() throws Exception {
+		tmpdir = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString()).getAbsolutePath();
+		new File(tmpdir).mkdir();
+		File pyFilePath = new File(tmpdir, "test1.py");
+		try (OutputStream out = new FileOutputStream(pyFilePath)) {
+			String code = ""
+				+ "from pyflink.table.udf import udf\n"
+				+ "from pyflink.table import DataTypes\n"
+				+ "@udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING())\n"
+				+ "def func1(str):\n"
+				+ "    return str + str\n";
+			out.write(code.getBytes());
+		}
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		flinkTableEnv = BatchTableEnvironment.create(env);
+		flinkTableEnv.getConfig().getConfiguration().set(PYTHON_FILES, pyFilePath.getAbsolutePath());
+		StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+		blinkTableEnv = StreamTableEnvironment.create(
+			sEnv, EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
+		blinkTableEnv.getConfig().getConfiguration().set(PYTHON_FILES, pyFilePath.getAbsolutePath());
+		flinkSourceTable = flinkTableEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str");
+		blinkSourceTable = blinkTableEnv.fromDataStream(sEnv.fromElements("1", "2", "3")).as("str");
+	}
+
+	@After
+	public void cleanEnvironment() throws Exception {
+		FileUtils.deleteDirectory(new File(tmpdir));
+	}
+
+	@Test
+	public void testPythonFunctionFactory() {
+		// flink temporary catalog
+		flinkTableEnv.sqlUpdate("create temporary function func1 as 'test1.func1' language python");
+		verifyPlan(flinkSourceTable.select("func1(str)"), flinkTableEnv);
+
+		// flink temporary system
+		flinkTableEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python");
+		verifyPlan(flinkSourceTable.select("func1(str)"), flinkTableEnv);
+
+		// blink temporary catalog
+		blinkTableEnv.sqlUpdate("create temporary function func1 as 'test1.func1' language python");
+		verifyPlan(blinkSourceTable.select("func1(str)"), blinkTableEnv);
+
+		// blink temporary system
+		blinkTableEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python");
+		verifyPlan(blinkSourceTable.select("func1(str)"), blinkTableEnv);
+	}
+
+	private void verifyPlan(Table table, TableEnvironment tableEnvironment) {
+		String plan = tableEnvironment.explain(table);
+		Assert.assertTrue(plan.contains("PythonCalc(select=[func1(f0) AS _c0])"));
+	}
+
+	@BeforeClass
+	@AfterClass
 
 Review comment:
   I think AfterClass is enough, what do you think?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409278316
 
 

 ##########
 File path: flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionSqlTest.java
 ##########
 @@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.UUID;
+
+import static org.apache.flink.python.PythonOptions.PYTHON_FILES;
+
+/**
+ * Tests for PythonFunctionSqlTest.
+ */
+public class PythonFunctionSqlTest {
+
+	private String tmpdir = "";
+	private BatchTableEnvironment flinkTableEnv;
+	private StreamTableEnvironment blinkTableEnv;
+	private Table flinkSourceTable;
+	private Table blinkSourceTable;
+
+	private void closeStartedPythonProcess() {
+		synchronized (PythonFunctionFactoryUtil.class) {
+			if (PythonFunctionFactoryUtil.pythonProcessShutdownHook != null) {
+				PythonFunctionFactoryUtil.pythonProcessShutdownHook.run();
+				Runtime.getRuntime().removeShutdownHook(PythonFunctionFactoryUtil.pythonProcessShutdownHook);
+				PythonFunctionFactoryUtil.pythonProcessShutdownHook = null;
+			}
+		}
+	}
+
+	@Before
+	public void prepareEnvironment() throws IOException {
+		closeStartedPythonProcess();
+		tmpdir = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString()).getAbsolutePath();
+		new File(tmpdir).mkdir();
+		File pyFilePath = new File(tmpdir, "test1.py");
+		try (OutputStream out = new FileOutputStream(pyFilePath)) {
+			String code = ""
+				+ "from pyflink.table.udf import udf\n"
+				+ "from pyflink.table import DataTypes\n"
+				+ "@udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING())\n"
+				+ "def func1(str):\n"
+				+ "    return str + str\n";
+			out.write(code.getBytes());
+		}
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		flinkTableEnv = BatchTableEnvironment.create(env);
+		flinkTableEnv.getConfig().getConfiguration().set(PYTHON_FILES, pyFilePath.getAbsolutePath());
+		StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+		blinkTableEnv = StreamTableEnvironment.create(
+			sEnv, EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
+		blinkTableEnv.getConfig().getConfiguration().set(PYTHON_FILES, pyFilePath.getAbsolutePath());
+		flinkSourceTable = flinkTableEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str");
+		blinkSourceTable = blinkTableEnv.fromDataStream(sEnv.fromElements("1", "2", "3")).as("str");
+	}
+
+	@After
+	public void cleanEnvironment() throws IOException {
+		closeStartedPythonProcess();
+		FileUtils.deleteDirectory(new File(tmpdir));
+	}
+
+	@Test
+	public void testPythonFunctionFactory() {
+		// flink temporary catalog
+		flinkTableEnv.sqlUpdate("create temporary function func1 as 'test1.func1' language python");
+		Table flinkTable = flinkSourceTable.select("func1(str)");
+		String flinkPlan = flinkTableEnv.explain(flinkTable);
+		Assert.assertTrue(flinkPlan.contains("DataSetPythonCalc(select=[func1(f0) AS _c0])"));
 
 Review comment:
   Change `DataSetPythonCalc` to `PythonCalc` and refactor the test case a bit to remove the code duplication? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409266063
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/client/python/PythonFunctionFactoryUtil.java
 ##########
 @@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+
+import py4j.GatewayServer;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.client.python.PythonEnvUtils.getGatewayServer;
+import static org.apache.flink.client.python.PythonEnvUtils.launchPy4jPythonClient;
+import static org.apache.flink.client.python.PythonEnvUtils.setGatewayServer;
+import static org.apache.flink.client.python.PythonEnvUtils.shutdownPythonProcess;
+
+/**
+ * Utils for PythonFunctionFactory.
+ */
+public class PythonFunctionFactoryUtil {
+
+	private static final long CHECK_INTERVAL = 100;
+
+	private static final long TIMEOUT_MILLIS = 3000;
 
 Review comment:
   Seems too small for me, change to 10000?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409972882
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java
 ##########
 @@ -18,13 +18,34 @@
 
 package org.apache.flink.table.functions;
 
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.FunctionLanguage;
+
+import java.lang.reflect.InvocationTargetException;
+
 /**
  * A util to instantiate {@link FunctionDefinition} in the default way.
  */
 public class FunctionDefinitionUtil {
 
 	public static FunctionDefinition createFunctionDefinition(String name, String className) {
-		// Currently only handles Java class-based functions
+		return createJavaFunctionDefinition(name, className);
+	}
+
+	public static FunctionDefinition createFunctionDefinition(
+			String name,
+			String className,
+			FunctionLanguage functionLanguage,
+			ReadableConfig config) {
+		switch (functionLanguage) {
 
 Review comment:
   It complains in my IDE that `switch has too few case labels`. Could you resolve this warning?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409276497
 
 

 ##########
 File path: flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionSqlTest.java
 ##########
 @@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.UUID;
+
+import static org.apache.flink.python.PythonOptions.PYTHON_FILES;
+
+/**
+ * Tests for PythonFunctionSqlTest.
+ */
+public class PythonFunctionSqlTest {
+
+	private String tmpdir = "";
+	private BatchTableEnvironment flinkTableEnv;
+	private StreamTableEnvironment blinkTableEnv;
+	private Table flinkSourceTable;
+	private Table blinkSourceTable;
+
+	private void closeStartedPythonProcess() {
 
 Review comment:
   Move the private method to the bottom of the class

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#issuecomment-613901508
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7499",
       "triggerID" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160344550",
       "triggerID" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2603c1936ff28ab745c711224857bba0693edd8",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160553016",
       "triggerID" : "d2603c1936ff28ab745c711224857bba0693edd8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2603c1936ff28ab745c711224857bba0693edd8",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7601",
       "triggerID" : "d2603c1936ff28ab745c711224857bba0693edd8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d2603c1936ff28ab745c711224857bba0693edd8 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160553016) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7601) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409958033
 
 

 ##########
 File path: flink-python/pyflink/table/tests/test_table_environment_api.py
 ##########
 @@ -412,6 +419,13 @@ def test_table_environment_with_blink_planner(self):
 
         self.assert_equals(results, ['2,hi,hello\n', '3,hello,hello\n'])
 
+    def test_sql_ddl(self):
 
 Review comment:
   Seems that this could be removed as it already exists in the parent class 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409257458
 
 

 ##########
 File path: flink-python/pyflink/java_rpc_server.py
 ##########
 @@ -0,0 +1,32 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import time
+
+from pyflink.java_gateway import get_gateway
+
+
+if __name__ == '__main__':
+    # just a daemon process used to serve the rpc call from Java.
+    gateway = get_gateway()
+    try:
+        # a simple watch dog
+        while gateway.jvm.PythonGatewayServer.ping():
 
 Review comment:
   Monitor whether the connection is broken just like the PythonGatewayServer does?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#issuecomment-613901508
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7499",
       "triggerID" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160344550",
       "triggerID" : "99a7954f7800e29e024c303e8428e2e9fe4a5ca0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 99a7954f7800e29e024c303e8428e2e9fe4a5ca0 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160344550) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7499) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409257180
 
 

 ##########
 File path: flink-python/pyflink/java_rpc_server.py
 ##########
 @@ -0,0 +1,32 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import time
+
+from pyflink.java_gateway import get_gateway
+
+
+if __name__ == '__main__':
 
 Review comment:
   What about rename the file name to `python_callback_server`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409265142
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/client/python/PythonFunctionFactoryUtil.java
 ##########
 @@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+
+import py4j.GatewayServer;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.client.python.PythonEnvUtils.getGatewayServer;
+import static org.apache.flink.client.python.PythonEnvUtils.launchPy4jPythonClient;
+import static org.apache.flink.client.python.PythonEnvUtils.setGatewayServer;
+import static org.apache.flink.client.python.PythonEnvUtils.shutdownPythonProcess;
+
+/**
+ * Utils for PythonFunctionFactory.
+ */
+public class PythonFunctionFactoryUtil {
+
+	private static final long CHECK_INTERVAL = 100;
+
+	private static final long TIMEOUT_MILLIS = 3000;
+
+	private static PythonFunctionFactory pythonFunctionFactory = null;
+
+	@VisibleForTesting
+	static Thread pythonProcessShutdownHook = null;
+
+	public static synchronized PythonFunctionFactory getPythonFunctionFactory(ReadableConfig config)
+			throws IOException, ExecutionException, InterruptedException {
+		if (pythonFunctionFactory != null) {
+			return pythonFunctionFactory;
+		} else {
+			if (getGatewayServer() == null) {
 
 Review comment:
   If pythonFunctionFactory is null, the gateway server should also be null. Do I missed something?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409261220
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
 ##########
 @@ -247,4 +257,90 @@ static Process startPythonProcess(PythonEnvironment pythonEnv, List<String> comm
 
 		return process;
 	}
+
+	static void shutdownPythonProcess(Process pythonProcess, long timeoutMillis) {
 
 Review comment:
   This method seems very command and is not specific for Python. What about move it to PythonFunctionFactoryUtil where it's used?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409959968
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
 ##########
 @@ -247,4 +256,63 @@ static Process startPythonProcess(PythonEnvironment pythonEnv, List<String> comm
 
 		return process;
 	}
+
+	/**
+	 * Py4J both supports Java to Python RPC and Python to Java RPC. The GatewayServer object is
+	 * the entry point of Java to Python RPC. Since the Py4j Python client will only be launched
+	 * only once, the GatewayServer object needs to be reused.
+	 */
+	private static GatewayServer gatewayServer = null;
+
+	/**
+	 * Creates a GatewayServer run in a daemon thread.
+	 *
+	 * @return The created GatewayServer
+	 */
+	static GatewayServer startGatewayServer() throws ExecutionException, InterruptedException {
+		if (gatewayServer != null) {
+			return gatewayServer;
+		}
+		CompletableFuture<GatewayServer> gatewayServerFuture = new CompletableFuture<>();
+		Thread thread = new Thread(() -> {
+			int freePort = NetUtils.getAvailablePort();
+			GatewayServer server = new GatewayServer.GatewayServerBuilder()
+				.gateway(new Gateway(new ConcurrentHashMap<String, Object>(), new CallbackClient(freePort)))
+				.javaPort(0)
+				.build();
+			gatewayServerFuture.complete(server);
+			server.start(true);
+		});
+		thread.setName("py4j-gateway");
+		thread.setDaemon(true);
+		thread.start();
+		thread.join();
+		gatewayServer = gatewayServerFuture.get();
+		return gatewayServer;
+	}
+
+	static Process launchPy4jPythonClient(
+			GatewayServer gatewayServer,
+			ReadableConfig config,
+			List<String> commands,
+			String entryPointScript,
+			String tmpDir) throws IOException {
+		PythonEnvironment pythonEnv = PythonEnvUtils.preparePythonEnvironment(
+			config, entryPointScript, tmpDir);
+		// set env variable PYFLINK_GATEWAY_PORT for connecting of python gateway in python process.
+		pythonEnv.systemEnv.put("PYFLINK_GATEWAY_PORT", String.valueOf(gatewayServer.getListeningPort()));
+		// set env variable PYFLINK_CALLBACK_PORT for creating callback server in python process.
+		pythonEnv.systemEnv.put("PYFLINK_CALLBACK_PORT", String.valueOf(gatewayServer.getCallbackClient().getPort()));
+		// start the python process.
+		return PythonEnvUtils.startPythonProcess(pythonEnv, commands);
+	}
+
+	static GatewayServer getGatewayServer() {
+		return gatewayServer;
+	}
+
+	static void removeGatewayServer() {
 
 Review comment:
   Could we place all the methods about gateway server, e.g. startGatewayServer/getGatewayServer together?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services