You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/09 16:54:20 UTC
[4/4] flink git commit: [FLINK-6476] [table] Add support to convert
DataSet[Row] and DataStream[Row] to Table.
[FLINK-6476] [table] Add support to convert DataSet[Row] and DataStream[Row] to Table.
This closes #3849.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28a89d1c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28a89d1c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28a89d1c
Branch: refs/heads/release-1.3
Commit: 28a89d1cac79063245bbc1ad9d262e3bc94b17b9
Parents: dd799c7
Author: rtudoran <tu...@ymail.com>
Authored: Mon May 8 20:30:57 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue May 9 18:53:33 2017 +0200
----------------------------------------------------------------------
.../flink/table/api/TableEnvironment.scala | 18 +++++++++
.../table/api/java/stream/sql/SqlITCase.java | 42 ++++++++++++++++++++
.../flink/table/TableEnvironmentTest.scala | 22 ++++++++++
.../table/api/scala/stream/sql/SqlITCase.scala | 36 +++++++++++++++++
4 files changed, 118 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/28a89d1c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index bb0de3e..bf4a8e0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -63,6 +63,7 @@ import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.sources.{DefinedFieldNames, TableSource}
import org.apache.flink.table.validate.FunctionCatalog
import org.apache.flink.types.Row
+import org.apache.flink.api.java.typeutils.RowTypeInfo
import _root_.scala.collection.JavaConverters._
import _root_.scala.collection.mutable.HashMap
@@ -677,6 +678,23 @@ abstract class TableEnvironment(val config: TableConfig) {
case _ => throw new TableException(
"Field reference expression or alias on field expression expected.")
}
+ case r: RowTypeInfo => {
+ exprs.zipWithIndex flatMap {
+ case (UnresolvedFieldReference(name), idx) =>
+ Some((idx, name))
+ case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
+ val idx = r.getFieldIndex(origName)
+ if (idx < 0) {
+ throw new TableException(s"$origName is not a field of type $r")
+ }
+ Some((idx, name))
+ case (_: TimeAttribute, _) =>
+ None
+ case _ => throw new TableException(
+ "Field reference expression or alias on field expression expected.")
+ }
+
+ }
case tpe => throw new TableException(
s"Source of type $tpe cannot be converted into Table.")
}
http://git-wip-us.apache.org/repos/asf/flink/blob/28a89d1c/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
index 7c01d2b..0c0b37e 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
@@ -30,11 +30,53 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.table.api.java.stream.utils.StreamTestData;
import org.junit.Test;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
import java.util.ArrayList;
import java.util.List;
public class SqlITCase extends StreamingMultipleProgramsTestBase {
+
+ @Test
+ public void testRowRegisterRowWithNames() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+ StreamITCase.clear();
+
+ List<Row> data = new ArrayList<>();
+ data.add(Row.of(1, 1L, "Hi"));
+ data.add(Row.of(2, 2L, "Hello"));
+ data.add(Row.of(3, 2L, "Hello world"));
+
+ TypeInformation<?>[] types = {
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO};
+ String names[] = {"a","b","c"};
+
+ RowTypeInfo typeInfo = new RowTypeInfo(types, names);
+
+ DataStream<Row> ds = env.fromCollection(data).returns(typeInfo);
+
+ Table in = tableEnv.fromDataStream(ds, "a,b,c");
+ tableEnv.registerTable("MyTableRow", in);
+
+ String sqlQuery = "SELECT a,c FROM MyTableRow";
+ Table result = tableEnv.sql(sqlQuery);
+
+ DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
+ resultSet.addSink(new StreamITCase.StringSink());
+ env.execute();
+
+ List<String> expected = new ArrayList<>();
+ expected.add("1,Hi");
+ expected.add("2,Hello");
+ expected.add("3,Hello world");
+
+ StreamITCase.compareWithList(expected);
+ }
@Test
public void testSelect() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/28a89d1c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
index ba3b591..98a8edb 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
@@ -54,6 +54,28 @@ class TableEnvironmentTest extends TableTestBase {
val genericRowType = new GenericTypeInfo[Row](classOf[Row])
@Test
+ def testGetFieldInfoRow(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(rowType)
+
+ fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test
+ def testGetFieldInfoRowNames(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(
+ rowType,
+ Array(
+ UnresolvedFieldReference("name1"),
+ UnresolvedFieldReference("name2"),
+ UnresolvedFieldReference("name3")
+ ))
+
+ fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test
def testGetFieldInfoTuple(): Unit = {
val fieldInfo = tEnv.getFieldInfo(tupleType)
http://git-wip-us.apache.org/repos/asf/flink/blob/28a89d1c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index 4147358..ba8c185 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -23,12 +23,48 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.types.Row
import org.junit.Assert._
import org.junit._
class SqlITCase extends StreamingWithStateTestBase {
+ /** test row stream registered table **/
+ @Test
+ def testRowRegister(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val sqlQuery = "SELECT * FROM MyTableRow WHERE c < 3"
+
+ val data = List(
+ Row.of("Hello", "Worlds", Int.box(1)),
+ Row.of("Hello", "Hiden", Int.box(5)),
+ Row.of("Hello again", "Worlds", Int.box(2)))
+
+ implicit val tpe: TypeInformation[Row] = new RowTypeInfo(
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO) // tpe is automatically
+
+ val ds = env.fromCollection(data)
+
+ val t = ds.toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTableRow", t)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = List("Hello,Worlds,1","Hello again,Worlds,2")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
/** test unbounded groupby (without window) **/
@Test
def testUnboundedGroupby(): Unit = {