You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/04/03 15:43:08 UTC

[GitHub] [flink] hequn8128 commented on a change in pull request #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces

hequn8128 commented on a change in pull request #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces
URL: https://github.com/apache/flink/pull/8050#discussion_r271807543
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
 ##########
 @@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.ExternalCatalog;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
+import org.apache.flink.table.descriptors.TableDescriptor;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
+
+/**
+ * The base class for batch and stream TableEnvironments.
+ *
+ * <p>The TableEnvironment is a central concept of the Table API and SQL integration. It is
+ * responsible for:
+ *
+ * <ul>
+ *     <li>Registering a Table in the internal catalog</li>
+ *     <li>Registering an external catalog</li>
+ *     <li>Executing SQL queries</li>
+ *     <li>Registering a user-defined (scalar, table, or aggregation) function</li>
+ *     <li>Converting a DataStream or DataSet into a Table</li>
+ *     <li>Holding a reference to an ExecutionEnvironment or StreamExecutionEnvironment</li>
+ * </ul>
+ */
+@PublicEvolving
+public interface TableEnvironment {
+
+	/**
+	 * Creates a table from a table source.
+	 *
+	 * @param source table source used as table
+	 */
+	Table fromTableSource(TableSource<?> source);
+
+	/**
+	 * Registers an {@link ExternalCatalog} under a unique name in the TableEnvironment's schema.
+	 * All tables registered in the {@link ExternalCatalog} can be accessed.
+	 *
+	 * @param name            The name under which the externalCatalog will be registered
+	 * @param externalCatalog The externalCatalog to register
+	 */
+	void registerExternalCatalog(String name, ExternalCatalog externalCatalog);
+
+	/**
+	 * Gets a registered {@link ExternalCatalog} by name.
+	 *
+	 * @param name The name to look up the {@link ExternalCatalog}
+	 * @return The {@link ExternalCatalog}
+	 */
+	ExternalCatalog getRegisteredExternalCatalog(String name);
+
+	/**
+	 * Registers a {@link ScalarFunction} under a unique name. Replaces already existing
+	 * user-defined functions under this name.
+	 */
+	void registerFunction(String name, ScalarFunction function);
+
+	/**
+	 * Registers a {@link Table} under a unique name in the TableEnvironment's catalog.
+	 * Registered tables can be referenced in SQL queries.
+	 *
+	 * @param name The name under which the table will be registered.
+	 * @param table The table to register.
+	 */
+	void registerTable(String name, Table table);
+
+	/**
+	 * Registers an external {@link TableSource} in this {@link TableEnvironment}'s catalog.
+	 * Registered tables can be referenced in SQL queries.
+	 *
+	 * @param name        The name under which the {@link TableSource} is registered.
+	 * @param tableSource The {@link TableSource} to register.
+	 */
+	void registerTableSource(String name, TableSource<?> tableSource);
+
+	/**
+	 * Registers an external {@link TableSink} with given field names and types in this
+	 * {@link TableEnvironment}'s catalog.
+	 * Registered sink tables can be referenced in SQL DML statements.
+	 *
+	 * @param name The name under which the {@link TableSink} is registered.
+	 * @param fieldNames The field names to register with the {@link TableSink}.
+	 * @param fieldTypes The field types to register with the {@link TableSink}.
+	 * @param tableSink The {@link TableSink} to register.
+	 */
+	void registerTableSink(String name, String[] fieldNames, TypeInformation<?>[] fieldTypes, TableSink<?> tableSink);
+
+	/**
+	 * Registers an external {@link TableSink} with already configured field names and field types in
+	 * this {@link TableEnvironment}'s catalog.
+	 * Registered sink tables can be referenced in SQL DML statements.
+	 *
+	 * @param name The name under which the {@link TableSink} is registered.
+	 * @param configuredSink The configured {@link TableSink} to register.
+	 */
+	void registerTableSink(String name, TableSink<?> configuredSink);
+
+	/**
+	 * Scans a registered table and returns the resulting {@link Table}.
+	 *
+	 * <p>A table to scan must be registered in the TableEnvironment. It can be either directly
+	 * registered as DataStream, DataSet, or Table or as member of an {@link ExternalCatalog}.
+	 *
+	 * <p>Examples:
+	 *
+	 * <p>Scanning a directly registered table.
+	 * <pre>
+	 * {@code
+	 *   Table tab = tableEnv.scan("tableName");
+	 * }
+	 * </pre>
+	 *
+	 * <p>Scanning a table from a registered catalog.
+	 * <pre>
+	 * {@code
+	 *   Table tab = tableEnv.scan("catalogName", "dbName", "tableName");
+	 * }
+	 * </pre>
+	 *
+	 * @param tablePath The path of the table to scan.
+	 * @throws TableException if no table is found using the given table path.
+	 * @return The resulting {@link Table}.
+	 */
+	Table scan(String... tablePath) throws TableException;
+
+	/**
+	 * Creates a table source and/or table sink from a descriptor.
+	 *
+	 * <p>Descriptors allow for declaring the communication to external systems in an
+	 * implementation-agnostic way. The classpath is scanned for suitable table factories that match
+	 * the desired configuration.
+	 *
+	 * <p>The following example shows how to read from a connector using a JSON format and
+	 * registering a table source as "MyTable":
+	 *
+	 * <pre>
+	 * {@code
+	 *
+	 * tableEnv
+	 *   .connect(
+	 *     new ExternalSystemXYZ()
+	 *       .version("0.11"))
+	 *   .withFormat(
+	 *     new Json()
+	 *       .jsonSchema("{...}")
+	 *       .failOnMissingField(false))
+	 *   .withSchema(
+	 *     new Schema()
+	 *       .field("user-name", "VARCHAR").from("u_name")
+	 *       .field("count", "DECIMAL")
+	 *   .registerSource("MyTable");
+	 * }
+	 *</pre>
+	 *
+	 * @param connectorDescriptor connector descriptor describing the external system
+	 */
+	TableDescriptor connect(ConnectorDescriptor connectorDescriptor);
+
+	/**
+	 * Gets the names of all tables registered in this environment.
+	 *
+	 * @return A list of the names of all registered tables.
+	 */
+	String[] listTables();
+
+	/**
+	 * Gets the names of all functions registered in this environment.
+	 */
+	String[] listUserDefinedFunctions();
+
+	/**
+	 * Returns the AST of the specified Table API and SQL queries and the execution plan to compute
+	 * the result of the given {@link Table}.
+	 *
+	 * @param table The table for which the AST and execution plan will be returned.
+	 */
+	String explain(Table table);
+
+	/**
+	 * Returns completion hints for the given statement at the given cursor position.
+	 * The completion happens case insensitively.
+	 *
+	 * @param statement Partial or slightly incorrect SQL statement
+	 * @param position cursor position
+	 * @return completion hints that fit at the current cursor position
+	 */
+	String[] getCompletionHints(String statement, int position);
 
 Review comment:
   Make sense. Maybe we can deprecate this method first and drop it in the next release, as it is public in the previous `TableEnvironment`. What do you think?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services