You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/09/26 17:03:11 UTC
flink git commit: [FLINK-4252] [table] Validate input and output
classes of Table API
Repository: flink
Updated Branches:
refs/heads/master 7eb45c133 -> f150f9877
[FLINK-4252] [table] Validate input and output classes of Table API
This closes #2507.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f150f987
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f150f987
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f150f987
Branch: refs/heads/master
Commit: f150f987772c8d96f41a5acd1d20cba6622cb5c9
Parents: 7eb45c1
Author: twalthr <tw...@apache.org>
Authored: Fri Sep 16 14:27:47 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Mon Sep 26 18:58:44 2016 +0200
----------------------------------------------------------------------
.../api/java/table/BatchTableEnvironment.scala | 5 ++---
.../flink/api/table/BatchTableEnvironment.scala | 4 +++-
.../flink/api/table/FlinkRelBuilder.scala | 4 ++--
.../api/table/StreamTableEnvironment.scala | 2 ++
.../flink/api/table/TableEnvironment.scala | 15 +++++++++++++
.../api/java/batch/table/FromDataSetITCase.java | 23 ++++++++++++++++++++
6 files changed, 47 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f150f987/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
index 9ba5b20..a4f40d5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
@@ -17,12 +17,11 @@
*/
package org.apache.flink.api.java.table
-import org.apache.flink.api.common.io.InputFormat
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.{ExecutionEnvironment, DataSet}
import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.table.expressions.ExpressionParser
-import org.apache.flink.api.table.{Row, TableConfig, Table}
+import org.apache.flink.api.table.{Table, TableConfig}
/**
* The [[org.apache.flink.api.table.TableEnvironment]] for a Java batch [[DataSet]]
http://git-wip-us.apache.org/repos/asf/flink/blob/f150f987/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
index eb4c819..ad3ff7a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
@@ -190,7 +190,7 @@ abstract class BatchTableEnvironment(
*
* @param table The table for which the AST and execution plan will be returned.
*/
- def explain(table: Table): String = explain(table: Table, false)
+ def explain(table: Table): String = explain(table: Table, extended = false)
/**
* Registers a [[DataSet]] as a table under a given name in the [[TableEnvironment]]'s catalog.
@@ -240,6 +240,8 @@ abstract class BatchTableEnvironment(
*/
protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = {
+ validateType(tpe)
+
val relNode = table.getRelNode
// decorrelate
http://git-wip-us.apache.org/repos/asf/flink/blob/f150f987/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
index e3bb97e..3827f05 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
@@ -19,7 +19,7 @@
package org.apache.flink.api.table
import org.apache.calcite.jdbc.CalciteSchema
-import org.apache.calcite.plan.{Context, RelOptCluster, RelOptSchema}
+import org.apache.calcite.plan.{Context, RelOptCluster, RelOptPlanner, RelOptSchema}
import org.apache.calcite.prepare.CalciteCatalogReader
import org.apache.calcite.rex.RexBuilder
import org.apache.calcite.schema.SchemaPlus
@@ -38,7 +38,7 @@ class FlinkRelBuilder(
cluster,
relOptSchema) {
- def getPlanner = cluster.getPlanner
+ def getPlanner: RelOptPlanner = cluster.getPlanner
def getCluster = cluster
http://git-wip-us.apache.org/repos/asf/flink/blob/f150f987/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
index f73cd3f..e3e5751 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
@@ -240,6 +240,8 @@ abstract class StreamTableEnvironment(
*/
protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = {
+ validateType(tpe)
+
val relNode = table.getRelNode
// decorrelate
http://git-wip-us.apache.org/repos/asf/flink/blob/f150f987/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
index b95198c..f56df0c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
@@ -18,6 +18,7 @@
package org.apache.flink.api.table
+import java.lang.reflect.Modifier
import java.util.concurrent.atomic.AtomicInteger
import org.apache.calcite.config.Lex
@@ -242,6 +243,16 @@ abstract class TableEnvironment(val config: TableConfig) {
frameworkConfig
}
+ protected def validateType(typeInfo: TypeInformation[_]): Unit = {
+ val clazz = typeInfo.getTypeClass
+ if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
+ !Modifier.isPublic(clazz.getModifiers) ||
+ clazz.getCanonicalName == null) {
+ throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " +
+ s"static and globally accessible.")
+ }
+ }
+
/**
* Returns field names and field positions for a given [[TypeInformation]].
*
@@ -257,6 +268,8 @@ abstract class TableEnvironment(val config: TableConfig) {
protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]):
(Array[String], Array[Int]) =
{
+ validateType(inputType)
+
val fieldNames: Array[String] = inputType match {
case t: TupleTypeInfo[A] => t.getFieldNames
case c: CaseClassTypeInfo[A] => c.getFieldNames
@@ -286,6 +299,8 @@ abstract class TableEnvironment(val config: TableConfig) {
inputType: TypeInformation[A],
exprs: Array[Expression]): (Array[String], Array[Int]) = {
+ validateType(inputType)
+
val indexedNames: Array[(Int, String)] = inputType match {
case a: AtomicType[A] =>
if (exprs.length != 1) {
http://git-wip-us.apache.org/repos/asf/flink/blob/f150f987/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java
index af96a04..e6b9226 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java
@@ -299,8 +299,31 @@ public class FromDataSetITCase extends TableProgramsTestBase {
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a as foo, b, c");
}
+ @Test(expected = TableException.class)
+ public void testNonStaticClassInput() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ // Must fail since class is not static
+ tableEnv.fromDataSet(env.fromElements(new MyNonStatic()), "name");
+ }
+
+ @Test(expected = TableException.class)
+ public void testNonStaticClassOutput() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ // Must fail since class is not static
+ Table t = tableEnv.fromDataSet(env.fromElements(1, 2, 3), "number");
+ tableEnv.toDataSet(t, MyNonStatic.class);
+ }
+
// --------------------------------------------------------------------------------------------
+ public class MyNonStatic {
+ public int number;
+ }
+
@SuppressWarnings("unused")
public static class SmallPojo {