You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by he...@apache.org on 2019/09/18 11:40:12 UTC

[flink] 02/03: [FLINK-14014][python][table-common] Adds basic classes PythonFunction, PythonEnv, PythonFunctionInfo

This is an automated email from the ASF dual-hosted git repository.

hequn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 10cb5abb7d752aeee6e045636446c00e0eda4f00
Author: Dian Fu <fu...@alibaba-inc.com>
AuthorDate: Wed Sep 18 17:12:13 2019 +0800

    [FLINK-14014][python][table-common] Adds basic classes PythonFunction, PythonEnv, PythonFunctionInfo
    
    Introduces basic classes such as PythonEnv, PythonFunction, PythonFunctionInfo
    into flink-table-common which hold the information such as the Python execution
    environment, the serialized Python functions, etc. These classes are located in
    flink-table-common because they will be used by the added RelNode which will be
    introduced later.
---
 .../flink/table/functions/python/PythonEnv.java    | 77 ++++++++++++++++++++++
 .../table/functions/python/PythonFunction.java     | 41 ++++++++++++
 .../table/functions/python/PythonFunctionInfo.java | 60 +++++++++++++++++
 3 files changed, 178 insertions(+)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonEnv.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonEnv.java
new file mode 100644
index 0000000..5d2be94
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonEnv.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * Python execution environments.
+ */
+@Internal
+public final class PythonEnv implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * The path of the Python executable file used.
+	 */
+	private final String pythonExec;
+
+	/**
+	 * The command to start Python worker process.
+	 */
+	private final String pythonWorkerCmd;
+
+	/**
+	 * The execution type of the Python worker, it defines how to execute the Python functions.
+	 */
+	private final ExecType execType;
+
+	public PythonEnv(
+		String pythonExec,
+		String pythonWorkerCmd,
+		ExecType execType) {
+		this.pythonExec = Preconditions.checkNotNull(pythonExec);
+		this.pythonWorkerCmd = Preconditions.checkNotNull(pythonWorkerCmd);
+		this.execType = Preconditions.checkNotNull(execType);
+	}
+
+	public String getPythonExec() {
+		return pythonExec;
+	}
+
+	public String getPythonWorkerCmd() {
+		return pythonWorkerCmd;
+	}
+
+	public ExecType getExecType() {
+		return execType;
+	}
+
+	/**
+	 * The Execution type specifies how to execute the Python function.
+	 */
+	public enum ExecType {
+		// python function is executed in a separate process
+		PROCESS
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonFunction.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonFunction.java
new file mode 100644
index 0000000..cd6bf9c
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonFunction.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.python;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.Serializable;
+
+/**
+ * The base interface of a wrapper of a Python function. It wraps the serialized Python function
+ * and the execution environment.
+ */
+@Internal
+public interface PythonFunction extends Serializable {
+
+	/**
+	 * Returns the serialized representation of the user-defined python function.
+	 */
+	byte[] getSerializedPythonFunction();
+
+	/**
+	 * Returns the Python execution environment.
+	 */
+	PythonEnv getPythonEnv();
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonFunctionInfo.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonFunctionInfo.java
new file mode 100644
index 0000000..88d2394
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonFunctionInfo.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * PythonFunctionInfo contains the execution information of a Python function, such as:
+ * the actual Python function, the input arguments, etc.
+ */
+@Internal
+public final class PythonFunctionInfo implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * The python function to be executed.
+	 */
+	private final PythonFunction pythonFunction;
+
+	/**
+	 * The input arguments, it could be an input offset of the input row or
+	 * the execution result of another python function described as PythonFunctionInfo.
+	 */
+	private final Object[] inputs;
+
+	public PythonFunctionInfo(
+		PythonFunction pythonFunction,
+		Object[] inputs) {
+		this.pythonFunction = Preconditions.checkNotNull(pythonFunction);
+		this.inputs = Preconditions.checkNotNull(inputs);
+	}
+
+	public PythonFunction getPythonFunction() {
+		return this.pythonFunction;
+	}
+
+	public Object[] getInputs() {
+		return this.inputs;
+	}
+}