You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/09/26 17:03:11 UTC

flink git commit: [FLINK-4252] [table] Validate input and output classes of Table API

Repository: flink
Updated Branches:
  refs/heads/master 7eb45c133 -> f150f9877


[FLINK-4252] [table] Validate input and output classes of Table API

This closes #2507.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f150f987
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f150f987
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f150f987

Branch: refs/heads/master
Commit: f150f987772c8d96f41a5acd1d20cba6622cb5c9
Parents: 7eb45c1
Author: twalthr <tw...@apache.org>
Authored: Fri Sep 16 14:27:47 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Mon Sep 26 18:58:44 2016 +0200

----------------------------------------------------------------------
 .../api/java/table/BatchTableEnvironment.scala  |  5 ++---
 .../flink/api/table/BatchTableEnvironment.scala |  4 +++-
 .../flink/api/table/FlinkRelBuilder.scala       |  4 ++--
 .../api/table/StreamTableEnvironment.scala      |  2 ++
 .../flink/api/table/TableEnvironment.scala      | 15 +++++++++++++
 .../api/java/batch/table/FromDataSetITCase.java | 23 ++++++++++++++++++++
 6 files changed, 47 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f150f987/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
index 9ba5b20..a4f40d5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
@@ -17,12 +17,11 @@
  */
 package org.apache.flink.api.java.table
 
-import org.apache.flink.api.common.io.InputFormat
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.{ExecutionEnvironment, DataSet}
 import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
 import org.apache.flink.api.table.expressions.ExpressionParser
-import org.apache.flink.api.table.{Row, TableConfig, Table}
+import org.apache.flink.api.table.{Table, TableConfig}
 
 /**
   * The [[org.apache.flink.api.table.TableEnvironment]] for a Java batch [[DataSet]]

http://git-wip-us.apache.org/repos/asf/flink/blob/f150f987/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
index eb4c819..ad3ff7a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
@@ -190,7 +190,7 @@ abstract class BatchTableEnvironment(
     *
     * @param table The table for which the AST and execution plan will be returned.
     */
-  def explain(table: Table): String = explain(table: Table, false)
+  def explain(table: Table): String = explain(table: Table, extended = false)
 
   /**
     * Registers a [[DataSet]] as a table under a given name in the [[TableEnvironment]]'s catalog.
@@ -240,6 +240,8 @@ abstract class BatchTableEnvironment(
     */
   protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = {
 
+    validateType(tpe)
+
     val relNode = table.getRelNode
 
     // decorrelate

http://git-wip-us.apache.org/repos/asf/flink/blob/f150f987/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
index e3bb97e..3827f05 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.api.table
 
 import org.apache.calcite.jdbc.CalciteSchema
-import org.apache.calcite.plan.{Context, RelOptCluster, RelOptSchema}
+import org.apache.calcite.plan.{Context, RelOptCluster, RelOptPlanner, RelOptSchema}
 import org.apache.calcite.prepare.CalciteCatalogReader
 import org.apache.calcite.rex.RexBuilder
 import org.apache.calcite.schema.SchemaPlus
@@ -38,7 +38,7 @@ class FlinkRelBuilder(
     cluster,
     relOptSchema) {
 
-  def getPlanner = cluster.getPlanner
+  def getPlanner: RelOptPlanner = cluster.getPlanner
 
   def getCluster = cluster
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f150f987/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
index f73cd3f..e3e5751 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
@@ -240,6 +240,8 @@ abstract class StreamTableEnvironment(
     */
   protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = {
 
+    validateType(tpe)
+
     val relNode = table.getRelNode
 
     // decorrelate

http://git-wip-us.apache.org/repos/asf/flink/blob/f150f987/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
index b95198c..f56df0c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.table
 
+import java.lang.reflect.Modifier
 import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.calcite.config.Lex
@@ -242,6 +243,16 @@ abstract class TableEnvironment(val config: TableConfig) {
     frameworkConfig
   }
 
+  protected def validateType(typeInfo: TypeInformation[_]): Unit = {
+    val clazz = typeInfo.getTypeClass
+    if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
+        !Modifier.isPublic(clazz.getModifiers) ||
+        clazz.getCanonicalName == null) {
+      throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " +
+        s"static and globally accessible.")
+    }
+  }
+
   /**
     * Returns field names and field positions for a given [[TypeInformation]].
     *
@@ -257,6 +268,8 @@ abstract class TableEnvironment(val config: TableConfig) {
   protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]):
       (Array[String], Array[Int]) =
   {
+    validateType(inputType)
+
     val fieldNames: Array[String] = inputType match {
       case t: TupleTypeInfo[A] => t.getFieldNames
       case c: CaseClassTypeInfo[A] => c.getFieldNames
@@ -286,6 +299,8 @@ abstract class TableEnvironment(val config: TableConfig) {
     inputType: TypeInformation[A],
     exprs: Array[Expression]): (Array[String], Array[Int]) = {
 
+    validateType(inputType)
+
     val indexedNames: Array[(Int, String)] = inputType match {
       case a: AtomicType[A] =>
         if (exprs.length != 1) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f150f987/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java
index af96a04..e6b9226 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java
@@ -299,8 +299,31 @@ public class FromDataSetITCase extends TableProgramsTestBase {
 		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a as foo, b,  c");
 	}
 
+	@Test(expected = TableException.class)
+	public void testNonStaticClassInput() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// Must fail since class is not static
+		tableEnv.fromDataSet(env.fromElements(new MyNonStatic()), "name");
+	}
+
+	@Test(expected = TableException.class)
+	public void testNonStaticClassOutput() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// Must fail since class is not static
+		Table t = tableEnv.fromDataSet(env.fromElements(1, 2, 3), "number");
+		tableEnv.toDataSet(t, MyNonStatic.class);
+	}
+
 	// --------------------------------------------------------------------------------------------
 
+	public class MyNonStatic {
+		public int number;
+	}
+
 	@SuppressWarnings("unused")
 	public static class SmallPojo {