You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:27 UTC

[20/50] [abbrv] flink git commit: [FLINK-6059] [table] Reject GenericType when converting DataSet or DataStream to Table.

[FLINK-6059] [table] Reject GenericType<Row> when converting DataSet or DataStream to Table.

This closes #3546.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c8eb55f1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c8eb55f1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c8eb55f1

Branch: refs/heads/table-retraction
Commit: c8eb55f17d64722bb600c1083a478ab99e53f4ec
Parents: 2c68085
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed Mar 15 13:24:42 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat Apr 29 00:44:55 2017 +0200

----------------------------------------------------------------------
 .../flink/table/api/TableEnvironment.scala      | 13 +++++++-
 .../api/java/batch/TableEnvironmentITCase.java  | 31 ++++++++++++++++++++
 .../flink/table/TableEnvironmentTest.scala      | 18 ++++++++++--
 .../scala/batch/TableEnvironmentITCase.scala    | 30 +++++++++++++++++++
 4 files changed, 88 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c8eb55f1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 16c40fe..bd974b0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -565,7 +565,14 @@ abstract class TableEnvironment(val config: TableConfig) {
     */
   protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]):
   (Array[String], Array[Int]) = {
-    (TableEnvironment.getFieldNames(inputType), TableEnvironment.getFieldIndices(inputType))
+
+    if (inputType.isInstanceOf[GenericTypeInfo[A]] && inputType.getTypeClass == classOf[Row]) {
+      throw new TableException(
+        "An input of GenericTypeInfo<Row> cannot be converted to Table. " +
+          "Please specify the type of the input with a RowTypeInfo.")
+    } else {
+      (TableEnvironment.getFieldNames(inputType), TableEnvironment.getFieldIndices(inputType))
+    }
   }
 
   /**
@@ -584,6 +591,10 @@ abstract class TableEnvironment(val config: TableConfig) {
     TableEnvironment.validateType(inputType)
 
     val indexedNames: Array[(Int, String)] = inputType match {
+      case g: GenericTypeInfo[A] if g.getTypeClass == classOf[Row] =>
+        throw new TableException(
+          "An input of GenericTypeInfo<Row> cannot be converted to Table. " +
+            "Please specify the type of the input with a RowTypeInfo.")
       case a: AtomicType[A] =>
         if (exprs.length != 1) {
           throw new TableException("Table of atomic type can only have a single field.")

http://git-wip-us.apache.org/repos/asf/flink/blob/c8eb55f1/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
index e165983..cab3855 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.calcite.tools.RuleSets;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
@@ -46,6 +47,8 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import static org.junit.Assert.assertTrue;
+
 @RunWith(Parameterized.class)
 public class TableEnvironmentITCase extends TableProgramsCollectionTestBase {
 
@@ -375,6 +378,34 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase {
 	}
 
 	@Test(expected = TableException.class)
+	public void testGenericRow() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// use null value the enforce GenericType
+		DataSet<Row> dataSet = env.fromElements(Row.of(1, 2L, "Hello", null));
+		assertTrue(dataSet.getType() instanceof GenericTypeInfo);
+		assertTrue(dataSet.getType().getTypeClass().equals(Row.class));
+
+		// Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
+		tableEnv.fromDataSet(dataSet);
+	}
+
+	@Test(expected = TableException.class)
+	public void testGenericRowWithAlias() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// use null value the enforce GenericType
+		DataSet<Row> dataSet = env.fromElements(Row.of((Integer)null));
+		assertTrue(dataSet.getType() instanceof GenericTypeInfo);
+		assertTrue(dataSet.getType().getTypeClass().equals(Row.class));
+
+		// Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
+		tableEnv.fromDataSet(dataSet, "nullField");
+	}
+
+	@Test(expected = TableException.class)
 	public void testAsWithToFewFields() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());

http://git-wip-us.apache.org/repos/asf/flink/blob/c8eb55f1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
index 3d93f45..9939a9c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
@@ -21,13 +21,13 @@ package org.apache.flink.table
 import org.apache.flink.api.scala._
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
 import org.apache.flink.table.api.scala._
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo, TypeExtractor}
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference}
 import org.apache.flink.table.utils.{MockTableEnvironment, TableTestBase}
-import org.apache.flink.table.utils.TableTestUtil._
-
+import org.apache.flink.table.utils.TableTestUtil.{batchTableNode, binaryNode, streamTableNode, term, unaryNode}
+import org.apache.flink.types.Row
 import org.junit.Test
 import org.junit.Assert.assertEquals
 
@@ -46,6 +46,8 @@ class TableEnvironmentTest extends TableTestBase {
 
   val atomicType = INT_TYPE_INFO
 
+  val genericRowType = new GenericTypeInfo[Row](classOf[Row])
+
   @Test
   def testGetFieldInfoTuple(): Unit = {
     val fieldInfo = tEnv.getFieldInfo(tupleType)
@@ -78,6 +80,11 @@ class TableEnvironmentTest extends TableTestBase {
     fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1))
   }
 
+  @Test(expected = classOf[TableException])
+  def testGetFieldInfoGenericRow(): Unit = {
+    tEnv.getFieldInfo(genericRowType)
+  }
+
   @Test
   def testGetFieldInfoTupleNames(): Unit = {
     val fieldInfo = tEnv.getFieldInfo(
@@ -278,6 +285,11 @@ class TableEnvironmentTest extends TableTestBase {
       ))
   }
 
+  @Test(expected = classOf[TableException])
+  def testGetFieldInfoGenericRowAlias(): Unit = {
+    tEnv.getFieldInfo(genericRowType, Array(UnresolvedFieldReference("first")))
+  }
+
   @Test
   def testSqlWithoutRegisteringForBatchTables(): Unit = {
     val util = batchTestUtil()

http://git-wip-us.apache.org/repos/asf/flink/blob/c8eb55f1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
index 6cbe834..e61e190 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.scala.batch
 
 import java.util
 
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
@@ -31,6 +32,7 @@ import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
+import org.junit.Assert.assertTrue
 
 import scala.collection.JavaConverters._
 
@@ -254,6 +256,34 @@ class TableEnvironmentITCase(
     CollectionDataSets.get3TupleDataSet(env)
       .toTable(tEnv, 'a as 'foo, 'b, 'c)
   }
+
+  @Test(expected = classOf[TableException])
+  def testGenericRow() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    // use null value the enforce GenericType
+    val dataSet = env.fromElements(Row.of(null))
+    assertTrue(dataSet.getType().isInstanceOf[GenericTypeInfo[_]])
+    assertTrue(dataSet.getType().getTypeClass == classOf[Row])
+
+    // Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
+    tableEnv.fromDataSet(dataSet)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testGenericRowWithAlias() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    // use null value the enforce GenericType
+    val dataSet = env.fromElements(Row.of(null))
+    assertTrue(dataSet.getType().isInstanceOf[GenericTypeInfo[_]])
+    assertTrue(dataSet.getType().getTypeClass == classOf[Row])
+
+    // Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
+    tableEnv.fromDataSet(dataSet, "nullField")
+  }
 }
 
 object TableEnvironmentITCase {