You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Zhenghua Gao (Jira)" <ji...@apache.org> on 2020/02/28 08:41:00 UTC

[jira] [Created] (FLINK-16327) Add TableEnvironment.fromElements interfaces for usability

Zhenghua Gao created FLINK-16327:
------------------------------------

             Summary: Add TableEnvironment.fromElements interfaces for usability
                 Key: FLINK-16327
                 URL: https://issues.apache.org/jira/browse/FLINK-16327
             Project: Flink
          Issue Type: New Feature
          Components: Table SQL / API
    Affects Versions: 1.11.0
            Reporter: Zhenghua Gao


h1. Interface
{code:java}
/** 
   * Creates a table from a group of objects (known as its elements). The schema of the table 
   * would be inferred from the type of elements. 
   * 
   * @param data a group of objects. 
   */
Table fromElements(Collection<?> data);
/** 
   * Creates a table from a group of objects (known as its elements). The schema of the table 
   * would be inferred from the passed in data type. 
   * 
   * @param data a group of objects 
   * @param dataType the data type of the data 
   */
Table fromElements(Collection<?> data, DataType dataType);
{code}
h1. Use Case
 * One potential use case for Table API

{code:java}
@Test 
def testUnregisteredCollectionSource1(): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val tEnv = StreamTableEnvironment.create(env)
  StreamITCase.testResults = mutable.MutableList()

  val data = Seq(
    Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))
  
  tEnv.fromElements(data.asJava)
  	.as('first, 'id, 'score, 'last)
    .where('id > 4)
    .select('last, 'score * 2)
    .toAppendStream[Row]
    .addSink(new StreamITCase.StringSink[Row])

  env.execute()
}

@Test 
def testUnregisteredCollectionSource2(): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val tEnv = StreamTableEnvironment.create(env)
  StreamITCase.testResults = mutable.MutableList()

  val data = Seq(
    Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))

  val dataType = DataTypes.ROW(
    DataTypes.FIELD("first", DataTypes.STRING()),
    DataTypes.FIELD("id", DataTypes.INT()),
    DataTypes.FIELD("score", DataTypes.DOUBLE()),
    DataTypes.FIELD("last", DataTypes.STRING()))

  tEnv.fromElements(data.asJava, dataType)
    .where('id > 4)
    .select('last, 'score * 2)
    .toAppendStream[Row]
    .addSink(new StreamITCase.StringSink[Row])

  env.execute()
}
{code}
 * One potential use case for SQL

{code:java}
@Test 
def testUnregisteredCollectionSource1(): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val tEnv = StreamTableEnvironment.create(env)
  StreamITCase.testResults = mutable.MutableList()

  val data = Seq(
    Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))
  
  val table = tEnv.fromElements(data.asJava).as('first, 'id, 'score, 'last)
  
  tEnv.createTemporaryView("T", table)

  tEnv.sqlQuery("SELECT last, score * 2 FROM T WHERE id > 4")
  	.toAppendStream[Row]
  	.addSink(new StreamITCase.StringSink[Row])

  env.execute()
}

@Test 
def testUnregisteredCollectionSource2(): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val tEnv = StreamTableEnvironment.create(env)
  StreamITCase.testResults = mutable.MutableList()

  val data = Seq(
    Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))

  val dataType = DataTypes.ROW(
    DataTypes.FIELD("first", DataTypes.STRING()),
    DataTypes.FIELD("id", DataTypes.INT()),
    DataTypes.FIELD("score", DataTypes.DOUBLE()),
    DataTypes.FIELD("last", DataTypes.STRING()))

  val table = tEnv.fromElements(data.asJava, dataType)
  tEnv.createTemporaryView("T", table)

  tEnv.sqlQuery("SELECT last, score * 2 FROM T WHERE id > 4")
  	.toAppendStream[Row]
  	.addSink(new StreamITCase.StringSink[Row])

  env.execute()
}
{code}
h1. The proposal
 * data type inference

We need to infer the data type from the data for the first interface. A potential tool is the DataTypeExtractor, but it doesn't support scala.tuple, Row, etc. For the most popular in our test cases Row or scala.tuple type, we could enumerate and use a recursive traversal method to get all available types of underlying objects. This can solve most of the cases and improve usability.
 * proposed changes
 ** A CollectionQueryOperation which implements QueryOperation to describe the relational operation
 ** The logical and physical RelNode for legacy planner. In the physical node, we can translate the data to DataStream
 ** The logical and physical RelNode for blink planner. In the physical node, we can translate the data to Transformation

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)