You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Zhenghua Gao (Jira)" <ji...@apache.org> on 2020/02/28 08:41:00 UTC
[jira] [Created] (FLINK-16327) Add TableEnvironment.fromElements
interfaces for usability
Zhenghua Gao created FLINK-16327:
------------------------------------
Summary: Add TableEnvironment.fromElements interfaces for usability
Key: FLINK-16327
URL: https://issues.apache.org/jira/browse/FLINK-16327
Project: Flink
Issue Type: New Feature
Components: Table SQL / API
Affects Versions: 1.11.0
Reporter: Zhenghua Gao
h1. Interface
{code:java}
/**
* Creates a table from a group of objects (known as its elements). The schema of the table
* would be inferred from the type of elements.
*
* @param data a group of objects.
*/
Table fromElements(Collection<?> data);
/**
* Creates a table from a group of objects (known as its elements). The schema of the table
* would be inferred from the passed in data type.
*
* @param data a group of objects
* @param dataType the data type of the data
*/
Table fromElements(Collection<?> data, DataType dataType);
{code}
h1. Use Case
* One potential use case for Table API
{code:java}
@Test
def testUnregisteredCollectionSource1(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
StreamITCase.testResults = mutable.MutableList()
val data = Seq(
Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))
tEnv.fromElements(data.asJava)
.as('first, 'id, 'score, 'last)
.where('id > 4)
.select('last, 'score * 2)
.toAppendStream[Row]
.addSink(new StreamITCase.StringSink[Row])
env.execute()
}
@Test
def testUnregisteredCollectionSource2(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
StreamITCase.testResults = mutable.MutableList()
val data = Seq(
Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))
val dataType = DataTypes.ROW(
DataTypes.FIELD("first", DataTypes.STRING()),
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("score", DataTypes.DOUBLE()),
DataTypes.FIELD("last", DataTypes.STRING()))
tEnv.fromElements(data.asJava, dataType)
.where('id > 4)
.select('last, 'score * 2)
.toAppendStream[Row]
.addSink(new StreamITCase.StringSink[Row])
env.execute()
}
{code}
* One potential use case for SQL
{code:java}
@Test
def testUnregisteredCollectionSource1(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
StreamITCase.testResults = mutable.MutableList()
val data = Seq(
Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))
val table = tEnv.fromElements(data.asJava).as('first, 'id, 'score, 'last)
tEnv.createTemporaryView("T", table)
tEnv.sqlQuery("SELECT last, score * 2 FROM T WHERE id > 4")
.toAppendStream[Row]
.addSink(new StreamITCase.StringSink[Row])
env.execute()
}
@Test
def testUnregisteredCollectionSource2(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
StreamITCase.testResults = mutable.MutableList()
val data = Seq(
Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))
val dataType = DataTypes.ROW(
DataTypes.FIELD("first", DataTypes.STRING()),
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("score", DataTypes.DOUBLE()),
DataTypes.FIELD("last", DataTypes.STRING()))
val table = tEnv.fromElements(data.asJava, dataType)
tEnv.createTemporaryView("T", table)
tEnv.sqlQuery("SELECT last, score * 2 FROM T WHERE id > 4")
.toAppendStream[Row]
.addSink(new StreamITCase.StringSink[Row])
env.execute()
}
{code}
h1. The proposal
* data type inference
We need to infer the data type from the data for the first interface. A potential tool is the DataTypeExtractor, but it doesn't support scala.tuple, Row, etc. For the most popular in our test cases Row or scala.tuple type, we could enumerate and use a recursive traversal method to get all available types of underlying objects. This can solve most of the cases and improve usability.
* proposed changes
** A CollectionQueryOperation which implements QueryOperation to describe the relational operation
** The logical and physical RelNode for legacy planner. In the physical node, we can translate the data to DataStream
** The logical and physical RelNode for blink planner. In the physical node, we can translate the data to Transformation
--
This message was sent by Atlassian Jira
(v8.3.4#803005)