You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:27 UTC
[20/50] [abbrv] flink git commit: [FLINK-6059] [table] Reject
GenericType when converting DataSet or DataStream to Table.
[FLINK-6059] [table] Reject GenericType<Row> when converting DataSet or DataStream to Table.
This closes #3546.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c8eb55f1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c8eb55f1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c8eb55f1
Branch: refs/heads/table-retraction
Commit: c8eb55f17d64722bb600c1083a478ab99e53f4ec
Parents: 2c68085
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed Mar 15 13:24:42 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat Apr 29 00:44:55 2017 +0200
----------------------------------------------------------------------
.../flink/table/api/TableEnvironment.scala | 13 +++++++-
.../api/java/batch/TableEnvironmentITCase.java | 31 ++++++++++++++++++++
.../flink/table/TableEnvironmentTest.scala | 18 ++++++++++--
.../scala/batch/TableEnvironmentITCase.scala | 30 +++++++++++++++++++
4 files changed, 88 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c8eb55f1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 16c40fe..bd974b0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -565,7 +565,14 @@ abstract class TableEnvironment(val config: TableConfig) {
*/
protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]):
(Array[String], Array[Int]) = {
- (TableEnvironment.getFieldNames(inputType), TableEnvironment.getFieldIndices(inputType))
+
+ if (inputType.isInstanceOf[GenericTypeInfo[A]] && inputType.getTypeClass == classOf[Row]) {
+ throw new TableException(
+ "An input of GenericTypeInfo<Row> cannot be converted to Table. " +
+ "Please specify the type of the input with a RowTypeInfo.")
+ } else {
+ (TableEnvironment.getFieldNames(inputType), TableEnvironment.getFieldIndices(inputType))
+ }
}
/**
@@ -584,6 +591,10 @@ abstract class TableEnvironment(val config: TableConfig) {
TableEnvironment.validateType(inputType)
val indexedNames: Array[(Int, String)] = inputType match {
+ case g: GenericTypeInfo[A] if g.getTypeClass == classOf[Row] =>
+ throw new TableException(
+ "An input of GenericTypeInfo<Row> cannot be converted to Table. " +
+ "Please specify the type of the input with a RowTypeInfo.")
case a: AtomicType[A] =>
if (exprs.length != 1) {
throw new TableException("Table of atomic type can only have a single field.")
http://git-wip-us.apache.org/repos/asf/flink/blob/c8eb55f1/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
index e165983..cab3855 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.calcite.tools.RuleSets;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
@@ -46,6 +47,8 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import static org.junit.Assert.assertTrue;
+
@RunWith(Parameterized.class)
public class TableEnvironmentITCase extends TableProgramsCollectionTestBase {
@@ -375,6 +378,34 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase {
}
@Test(expected = TableException.class)
+ public void testGenericRow() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ // use null value the enforce GenericType
+ DataSet<Row> dataSet = env.fromElements(Row.of(1, 2L, "Hello", null));
+ assertTrue(dataSet.getType() instanceof GenericTypeInfo);
+ assertTrue(dataSet.getType().getTypeClass().equals(Row.class));
+
+ // Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
+ tableEnv.fromDataSet(dataSet);
+ }
+
+ @Test(expected = TableException.class)
+ public void testGenericRowWithAlias() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ // use null value the enforce GenericType
+ DataSet<Row> dataSet = env.fromElements(Row.of((Integer)null));
+ assertTrue(dataSet.getType() instanceof GenericTypeInfo);
+ assertTrue(dataSet.getType().getTypeClass().equals(Row.class));
+
+ // Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
+ tableEnv.fromDataSet(dataSet, "nullField");
+ }
+
+ @Test(expected = TableException.class)
public void testAsWithToFewFields() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
http://git-wip-us.apache.org/repos/asf/flink/blob/c8eb55f1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
index 3d93f45..9939a9c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
@@ -21,13 +21,13 @@ package org.apache.flink.table
import org.apache.flink.api.scala._
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
import org.apache.flink.table.api.scala._
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo, TypeExtractor}
import org.apache.flink.table.api.TableException
import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference}
import org.apache.flink.table.utils.{MockTableEnvironment, TableTestBase}
-import org.apache.flink.table.utils.TableTestUtil._
-
+import org.apache.flink.table.utils.TableTestUtil.{batchTableNode, binaryNode, streamTableNode, term, unaryNode}
+import org.apache.flink.types.Row
import org.junit.Test
import org.junit.Assert.assertEquals
@@ -46,6 +46,8 @@ class TableEnvironmentTest extends TableTestBase {
val atomicType = INT_TYPE_INFO
+ val genericRowType = new GenericTypeInfo[Row](classOf[Row])
+
@Test
def testGetFieldInfoTuple(): Unit = {
val fieldInfo = tEnv.getFieldInfo(tupleType)
@@ -78,6 +80,11 @@ class TableEnvironmentTest extends TableTestBase {
fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1))
}
+ @Test(expected = classOf[TableException])
+ def testGetFieldInfoGenericRow(): Unit = {
+ tEnv.getFieldInfo(genericRowType)
+ }
+
@Test
def testGetFieldInfoTupleNames(): Unit = {
val fieldInfo = tEnv.getFieldInfo(
@@ -278,6 +285,11 @@ class TableEnvironmentTest extends TableTestBase {
))
}
+ @Test(expected = classOf[TableException])
+ def testGetFieldInfoGenericRowAlias(): Unit = {
+ tEnv.getFieldInfo(genericRowType, Array(UnresolvedFieldReference("first")))
+ }
+
@Test
def testSqlWithoutRegisteringForBatchTables(): Unit = {
val util = batchTestUtil()
http://git-wip-us.apache.org/repos/asf/flink/blob/c8eb55f1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
index 6cbe834..e61e190 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.scala.batch
import java.util
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala.util.CollectionDataSets
@@ -31,6 +32,7 @@ import org.apache.flink.test.util.TestBaseUtils
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
+import org.junit.Assert.assertTrue
import scala.collection.JavaConverters._
@@ -254,6 +256,34 @@ class TableEnvironmentITCase(
CollectionDataSets.get3TupleDataSet(env)
.toTable(tEnv, 'a as 'foo, 'b, 'c)
}
+
+ @Test(expected = classOf[TableException])
+ def testGenericRow() {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ // use null value the enforce GenericType
+ val dataSet = env.fromElements(Row.of(null))
+ assertTrue(dataSet.getType().isInstanceOf[GenericTypeInfo[_]])
+ assertTrue(dataSet.getType().getTypeClass == classOf[Row])
+
+ // Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
+ tableEnv.fromDataSet(dataSet)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testGenericRowWithAlias() {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ // use null value the enforce GenericType
+ val dataSet = env.fromElements(Row.of(null))
+ assertTrue(dataSet.getType().isInstanceOf[GenericTypeInfo[_]])
+ assertTrue(dataSet.getType().getTypeClass == classOf[Row])
+
+ // Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
+ tableEnv.fromDataSet(dataSet, "nullField")
+ }
}
object TableEnvironmentITCase {