You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/09 16:54:20 UTC

[4/4] flink git commit: [FLINK-6476] [table] Add support to convert DataSet[Row] and DataStream[Row] to Table.

[FLINK-6476] [table] Add support to convert DataSet[Row] and DataStream[Row] to Table.

This closes #3849.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28a89d1c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28a89d1c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28a89d1c

Branch: refs/heads/release-1.3
Commit: 28a89d1cac79063245bbc1ad9d262e3bc94b17b9
Parents: dd799c7
Author: rtudoran <tu...@ymail.com>
Authored: Mon May 8 20:30:57 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue May 9 18:53:33 2017 +0200

----------------------------------------------------------------------
 .../flink/table/api/TableEnvironment.scala      | 18 +++++++++
 .../table/api/java/stream/sql/SqlITCase.java    | 42 ++++++++++++++++++++
 .../flink/table/TableEnvironmentTest.scala      | 22 ++++++++++
 .../table/api/scala/stream/sql/SqlITCase.scala  | 36 +++++++++++++++++
 4 files changed, 118 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/28a89d1c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index bb0de3e..bf4a8e0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -63,6 +63,7 @@ import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.sources.{DefinedFieldNames, TableSource}
 import org.apache.flink.table.validate.FunctionCatalog
 import org.apache.flink.types.Row
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.collection.mutable.HashMap
@@ -677,6 +678,23 @@ abstract class TableEnvironment(val config: TableConfig) {
           case _ => throw new TableException(
             "Field reference expression or alias on field expression expected.")
         }
+      case r: RowTypeInfo => {
+        exprs.zipWithIndex flatMap {
+          case (UnresolvedFieldReference(name), idx) =>
+            Some((idx, name))
+          case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
+            val idx = r.getFieldIndex(origName)
+            if (idx < 0) {
+              throw new TableException(s"$origName is not a field of type $r")
+            }
+            Some((idx, name))
+          case (_: TimeAttribute, _) =>
+            None
+          case _ => throw new TableException(
+            "Field reference expression or alias on field expression expected.")
+        }
+        
+      }
       case tpe => throw new TableException(
         s"Source of type $tpe cannot be converted into Table.")
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/28a89d1c/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
index 7c01d2b..0c0b37e 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
@@ -30,11 +30,53 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.table.api.java.stream.utils.StreamTestData;
 import org.junit.Test;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 
 import java.util.ArrayList;
 import java.util.List;
 
 public class SqlITCase extends StreamingMultipleProgramsTestBase {
+	
+	@Test
+	public void testRowRegisterRowWithNames() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+		StreamITCase.clear();
+
+		List<Row> data = new ArrayList<>();
+		data.add(Row.of(1, 1L, "Hi"));
+		data.add(Row.of(2, 2L, "Hello"));
+		data.add(Row.of(3, 2L, "Hello world"));
+		
+		TypeInformation<?>[] types = {
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.LONG_TYPE_INFO,
+				BasicTypeInfo.STRING_TYPE_INFO};
+		String names[] = {"a","b","c"};
+		
+		RowTypeInfo typeInfo = new RowTypeInfo(types, names);
+		
+		DataStream<Row> ds = env.fromCollection(data).returns(typeInfo);
+		
+		Table in = tableEnv.fromDataStream(ds, "a,b,c");
+		tableEnv.registerTable("MyTableRow", in);
+
+		String sqlQuery = "SELECT a,c FROM MyTableRow";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
+		resultSet.addSink(new StreamITCase.StringSink());
+		env.execute();
+
+		List<String> expected = new ArrayList<>();
+		expected.add("1,Hi");
+		expected.add("2,Hello");
+		expected.add("3,Hello world");
+
+		StreamITCase.compareWithList(expected);
+	}
 
 	@Test
 	public void testSelect() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/28a89d1c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
index ba3b591..98a8edb 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
@@ -54,6 +54,28 @@ class TableEnvironmentTest extends TableTestBase {
   val genericRowType = new GenericTypeInfo[Row](classOf[Row])
 
   @Test
+  def testGetFieldInfoRow(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(rowType)
+
+    fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  }
+  
+  @Test
+  def testGetFieldInfoRowNames(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(
+      rowType,
+      Array(
+        UnresolvedFieldReference("name1"),
+        UnresolvedFieldReference("name2"),
+        UnresolvedFieldReference("name3")
+      ))
+
+    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  }
+  
+  @Test
   def testGetFieldInfoTuple(): Unit = {
     val fieldInfo = tEnv.getFieldInfo(tupleType)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/28a89d1c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index 4147358..ba8c185 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -23,12 +23,48 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit._
 
 class SqlITCase extends StreamingWithStateTestBase {
 
+   /** test row stream registered table **/
+  @Test
+  def testRowRegister(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val sqlQuery = "SELECT * FROM MyTableRow WHERE c < 3"
+
+    val data = List(
+      Row.of("Hello", "Worlds", Int.box(1)),
+      Row.of("Hello", "Hiden", Int.box(5)),
+      Row.of("Hello again", "Worlds", Int.box(2)))
+        
+    implicit val tpe: TypeInformation[Row] = new RowTypeInfo(
+      BasicTypeInfo.STRING_TYPE_INFO,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO) // tpe is automatically 
+    
+    val ds = env.fromCollection(data)
+    
+    val t = ds.toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("MyTableRow", t)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List("Hello,Worlds,1","Hello again,Worlds,2")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+    
   /** test unbounded groupby (without window) **/
   @Test
   def testUnboundedGroupby(): Unit = {