You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/11/17 13:45:31 UTC

flink git commit: [FLINK-4263] [table] SQL's VALUES does not work properly

Repository: flink
Updated Branches:
  refs/heads/master a1362c3af -> 836fe9786


[FLINK-4263] [table] SQL's VALUES does not work properly

This closes #2818.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/836fe978
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/836fe978
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/836fe978

Branch: refs/heads/master
Commit: 836fe9786631f35953c1423e2d92128e9f292621
Parents: a1362c3
Author: twalthr <tw...@apache.org>
Authored: Wed Nov 16 14:37:08 2016 +0100
Committer: twalthr <tw...@apache.org>
Committed: Thu Nov 17 14:39:44 2016 +0100

----------------------------------------------------------------------
 flink-libraries/flink-table/pom.xml             |  2 +-
 .../flink/api/table/codegen/CodeGenerator.scala | 65 +++++++++++++++++++-
 .../plan/nodes/dataset/DataSetValues.scala      | 40 ++++++++----
 .../nodes/datastream/DataStreamValues.scala     | 42 +++++++++----
 .../plan/rules/dataSet/DataSetValuesRule.scala  |  3 +-
 .../rules/datastream/DataStreamValuesRule.scala |  3 +-
 .../flink/api/table/runtime/Compiler.scala      | 42 +++++++++++++
 .../api/table/runtime/FlatJoinRunner.scala      |  2 +-
 .../flink/api/table/runtime/FlatMapRunner.scala |  4 +-
 .../api/table/runtime/FunctionCompiler.scala    | 42 -------------
 .../flink/api/table/runtime/MapRunner.scala     |  2 +-
 .../table/runtime/io/ValuesInputFormat.scala    | 46 ++++++++------
 .../flink/api/java/batch/sql/SqlITCase.java     | 20 ++++++
 .../expressions/utils/ExpressionTestBase.scala  |  4 +-
 14 files changed, 221 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml
index 4c91f1c..7e5f591 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -45,7 +45,7 @@ under the License.
 		<dependency>
 			<groupId>org.codehaus.janino</groupId>
 			<artifactId>janino</artifactId>
-			<version>2.7.5</version>
+			<version>3.0.6</version>
 		</dependency>
 
 		<dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
index b54c498..bbcd70f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
@@ -26,6 +26,7 @@ import org.apache.calcite.sql.SqlOperator
 import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql.fun.SqlStdOperatorTable._
 import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction, Function, MapFunction}
+import org.apache.flink.api.common.io.GenericInputFormat
 import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils.{GenericTypeInfo, PojoTypeInfo, TupleTypeInfo}
@@ -35,7 +36,7 @@ import org.apache.flink.api.table.codegen.Indenter.toISC
 import org.apache.flink.api.table.codegen.calls.ScalarFunctions
 import org.apache.flink.api.table.codegen.calls.ScalarOperators._
 import org.apache.flink.api.table.functions.UserDefinedFunction
-import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.typeutils.{RowTypeInfo, TypeConverter}
 import org.apache.flink.api.table.typeutils.TypeCheckUtils._
 import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig}
 
@@ -98,6 +99,13 @@ class CodeGenerator(
       inputPojoFieldMapping: Array[Int]) =
     this(config, nullableInput, input, None, Some(inputPojoFieldMapping))
 
+  /**
+    * A code generator for generating Flink input formats.
+    *
+    * @param config configuration that determines runtime behavior
+    */
+  def this(config: TableConfig) =
+    this(config, false, TypeConverter.DEFAULT_ROW_TYPE, None, None)
 
   // set of member statements that will be added only once
   // we use a LinkedHashSet to keep the insertion order
@@ -257,6 +265,61 @@ class CodeGenerator(
   }
 
   /**
+    * Generates a values input format that can be passed to Java compiler.
+    *
+    * @param name Class name of the input format. Must not be unique but has to be a
+    *             valid Java class identifier.
+    * @param records code for creating records
+    * @param returnType expected return type
+    * @tparam T Flink Function to be generated.
+    * @return instance of GeneratedFunction
+    */
+  def generateValuesInputFormat[T](
+      name: String,
+      records: Seq[String],
+      returnType: TypeInformation[Any])
+    : GeneratedFunction[GenericInputFormat[T]] = {
+    val funcName = newName(name)
+
+    addReusableOutRecord(returnType)
+
+    val funcCode = j"""
+      public class $funcName extends ${classOf[GenericInputFormat[_]].getCanonicalName} {
+
+        private int nextIdx = 0;
+
+        ${reuseMemberCode()}
+
+        public $funcName() throws Exception {
+          ${reuseInitCode()}
+        }
+
+        @Override
+        public boolean reachedEnd() throws java.io.IOException {
+          return nextIdx >= ${records.length};
+        }
+
+        @Override
+        public Object nextRecord(Object reuse) {
+          switch (nextIdx) {
+            ${records.zipWithIndex.map { case (r, i) =>
+              s"""
+                 |case $i:
+                 |  $r
+                 |break;
+               """.stripMargin
+            }.mkString("\n")}
+          }
+          nextIdx++;
+          return $outRecordTerm;
+        }
+      }
+    """.stripMargin
+
+    GeneratedFunction[GenericInputFormat[T]](funcName, returnType, funcCode)
+  }
+
+  /**
     * Generates an expression that converts the first input (and second input) into the given type.
     * If two inputs are converted, the second input is appended. If objects or variables can
     * be reused, they will be added to reusable code sections internally. The evaluation result

http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
index 1b637c8..4f3a257 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
@@ -20,19 +20,18 @@ package org.apache.flink.api.table.plan.nodes.dataset
 
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.Values
+import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rex.RexLiteral
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.codegen.CodeGenerator
 import org.apache.flink.api.table.runtime.io.ValuesInputFormat
-import org.apache.flink.api.table.typeutils.RowTypeInfo
 import org.apache.flink.api.table.typeutils.TypeConverter._
-import org.apache.flink.api.table.{BatchTableEnvironment, Row}
 
 import scala.collection.JavaConverters._
-import scala.collection.JavaConversions._
 
 /**
   * DataSet RelNode for a LogicalValues.
@@ -42,7 +41,8 @@ class DataSetValues(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     rowRelDataType: RelDataType,
-    tuples: ImmutableList[ImmutableList[RexLiteral]])
+    tuples: ImmutableList[ImmutableList[RexLiteral]],
+    ruleDescription: String)
   extends Values(cluster, rowRelDataType, tuples, traitSet)
   with DataSetRel {
 
@@ -53,7 +53,8 @@ class DataSetValues(
       cluster,
       traitSet,
       getRowType,
-      getTuples
+      getTuples,
+      ruleDescription
     )
   }
 
@@ -75,16 +76,29 @@ class DataSetValues(
       getRowType,
       expectedType,
       config.getNullCheck,
-      config.getEfficientTypeUsage).asInstanceOf[RowTypeInfo]
+      config.getEfficientTypeUsage)
+
+    val generator = new CodeGenerator(config)
 
-    // convert List[RexLiteral] to Row
-    val rows: Seq[Row] = getTuples.asList.map { t =>
-      val row = new Row(t.size())
-      t.zipWithIndex.foreach( x => row.setField(x._2, x._1.getValue.asInstanceOf[Any]) )
-      row
+    // generate code for every record
+    val generatedRecords = getTuples.asScala.map { r =>
+      generator.generateResultExpression(
+        returnType,
+        getRowType.getFieldNames.asScala,
+        r.asScala)
     }
 
-    val inputFormat = new ValuesInputFormat(rows)
+    // generate input format
+    val generatedFunction = generator.generateValuesInputFormat(
+      ruleDescription,
+      generatedRecords.map(_.code),
+      returnType)
+
+    val inputFormat = new ValuesInputFormat[Any](
+      generatedFunction.name,
+      generatedFunction.code,
+      generatedFunction.returnType)
+
     tableEnv.execEnv.createInput(inputFormat, returnType).asInstanceOf[DataSet[Any]]
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
index 44130e7..3b98653 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
@@ -25,13 +25,13 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.Values
 import org.apache.calcite.rex.RexLiteral
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.StreamTableEnvironment
+import org.apache.flink.api.table.codegen.CodeGenerator
 import org.apache.flink.api.table.runtime.io.ValuesInputFormat
-import org.apache.flink.api.table.{Row, StreamTableEnvironment}
-import org.apache.flink.api.table.typeutils.RowTypeInfo
 import org.apache.flink.api.table.typeutils.TypeConverter._
 import org.apache.flink.streaming.api.datastream.DataStream
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 /**
   * DataStream RelNode for LogicalValues.
@@ -40,7 +40,8 @@ class DataStreamValues(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     rowRelDataType: RelDataType,
-    tuples: ImmutableList[ImmutableList[RexLiteral]])
+    tuples: ImmutableList[ImmutableList[RexLiteral]],
+    ruleDescription: String)
   extends Values(cluster, rowRelDataType, tuples, traitSet)
   with DataStreamRel {
 
@@ -51,13 +52,15 @@ class DataStreamValues(
       cluster,
       traitSet,
       getRowType,
-      getTuples
+      getTuples,
+      ruleDescription
     )
   }
 
   override def translateToPlan(
       tableEnv: StreamTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]) : DataStream[Any] = {
+      expectedType: Option[TypeInformation[Any]])
+    : DataStream[Any] = {
 
     val config = tableEnv.getConfig
 
@@ -65,16 +68,29 @@ class DataStreamValues(
       getRowType,
       expectedType,
       config.getNullCheck,
-      config.getEfficientTypeUsage).asInstanceOf[RowTypeInfo]
+      config.getEfficientTypeUsage)
 
-    // convert List[RexLiteral] to Row
-    val rows: Seq[Row] = getTuples.asList.map { t =>
-      val row = new Row(t.size())
-      t.zipWithIndex.foreach( x => row.setField(x._2, x._1.getValue.asInstanceOf[Any]) )
-      row
+    val generator = new CodeGenerator(config)
+
+    // generate code for every record
+    val generatedRecords = getTuples.asScala.map { r =>
+      generator.generateResultExpression(
+        returnType,
+        getRowType.getFieldNames.asScala,
+        r.asScala)
     }
 
-    val inputFormat = new ValuesInputFormat(rows)
+    // generate input format
+    val generatedFunction = generator.generateValuesInputFormat(
+      ruleDescription,
+      generatedRecords.map(_.code),
+      returnType)
+
+    val inputFormat = new ValuesInputFormat[Any](
+      generatedFunction.name,
+      generatedFunction.code,
+      generatedFunction.returnType)
+
     tableEnv.execEnv.createInput(inputFormat, returnType).asInstanceOf[DataStream[Any]]
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala
index c28b458..3d6c0de 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala
@@ -41,7 +41,8 @@ class DataSetValuesRule
       rel.getCluster,
       traitSet,
       rel.getRowType,
-      values.getTuples)
+      values.getTuples,
+      description)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala
index fa2b428..738642d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala
@@ -41,7 +41,8 @@ class DataStreamValuesRule
       rel.getCluster,
       traitSet,
       rel.getRowType,
-      values.getTuples)
+      values.getTuples,
+      description)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/Compiler.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/Compiler.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/Compiler.scala
new file mode 100644
index 0000000..c5d566e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/Compiler.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.common.functions.Function
+import org.codehaus.commons.compiler.CompileException
+import org.codehaus.janino.SimpleCompiler
+
+trait Compiler[T] {
+
+  @throws(classOf[CompileException])
+  def compile(cl: ClassLoader, name: String, code: String): Class[T] = {
+    require(cl != null, "Classloader must not be null.")
+    val compiler = new SimpleCompiler()
+    compiler.setParentClassLoader(cl)
+    try {
+      compiler.cook(code)
+    } catch {
+      case e: CompileException =>
+        throw new InvalidProgramException("Table program cannot be compiled. " +
+          "This is a bug. Please file an issue.", e)
+    }
+    compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]]
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala
index 6e7d099..c6a8fe8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala
@@ -31,7 +31,7 @@ class FlatJoinRunner[IN1, IN2, OUT](
     @transient returnType: TypeInformation[OUT])
   extends RichFlatJoinFunction[IN1, IN2, OUT]
   with ResultTypeQueryable[OUT]
-  with FunctionCompiler[FlatJoinFunction[IN1, IN2, OUT]] {
+  with Compiler[FlatJoinFunction[IN1, IN2, OUT]] {
 
   val LOG = LoggerFactory.getLogger(this.getClass)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
index 8a3482f..2e942eb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
@@ -31,11 +31,11 @@ class FlatMapRunner[IN, OUT](
     @transient returnType: TypeInformation[OUT])
   extends RichFlatMapFunction[IN, OUT]
   with ResultTypeQueryable[OUT]
-  with FunctionCompiler[FlatMapFunction[IN, OUT]] {
+  with Compiler[FlatMapFunction[IN, OUT]] {
 
   val LOG = LoggerFactory.getLogger(this.getClass)
 
-  private var function: FlatMapFunction[IN, OUT] = null
+  private var function: FlatMapFunction[IN, OUT] = _
 
   override def open(parameters: Configuration): Unit = {
     LOG.debug(s"Compiling FlatMapFunction: $name \n\n Code:\n$code")

http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FunctionCompiler.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FunctionCompiler.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FunctionCompiler.scala
deleted file mode 100644
index de9b632..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FunctionCompiler.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.common.InvalidProgramException
-import org.apache.flink.api.common.functions.Function
-import org.codehaus.commons.compiler.CompileException
-import org.codehaus.janino.SimpleCompiler
-
-trait FunctionCompiler[T <: Function] {
-
-  @throws(classOf[CompileException])
-  def compile(cl: ClassLoader, name: String, code: String): Class[T] = {
-    require(cl != null, "Classloader must not be null.")
-    val compiler = new SimpleCompiler()
-    compiler.setParentClassLoader(cl)
-    try {
-      compiler.cook(code)
-    } catch {
-      case e: CompileException =>
-        throw new InvalidProgramException("Table program cannot be compiled. " +
-          "This is a bug. Please file an issue.", e)
-    }
-    compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]]
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
index f64635b..944b415 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
@@ -30,7 +30,7 @@ class MapRunner[IN, OUT](
     @transient returnType: TypeInformation[OUT])
   extends RichMapFunction[IN, OUT]
   with ResultTypeQueryable[OUT]
-  with FunctionCompiler[MapFunction[IN, OUT]] {
+  with Compiler[MapFunction[IN, OUT]] {
 
   val LOG = LoggerFactory.getLogger(this.getClass)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
index 5e0a466..34bff15 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
@@ -19,25 +19,35 @@
 package org.apache.flink.api.table.runtime.io
 
 import org.apache.flink.api.common.io.{GenericInputFormat, NonParallelInput}
-import org.apache.flink.api.table.Row
-
-class ValuesInputFormat(val rows: Seq[Row])
-  extends GenericInputFormat[Row]
-    with NonParallelInput {
-
-  var readIdx = 0
-
-  override def reachedEnd(): Boolean = readIdx == rows.size
-
-  override def nextRecord(reuse: Row): Row = {
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.api.table.runtime.Compiler
+import org.apache.flink.core.io.GenericInputSplit
+import org.slf4j.LoggerFactory
+
+class ValuesInputFormat[OUT](
+    name: String,
+    code: String,
+    @transient returnType: TypeInformation[OUT])
+  extends GenericInputFormat[OUT]
+  with NonParallelInput
+  with ResultTypeQueryable[OUT]
+  with Compiler[GenericInputFormat[OUT]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var format: GenericInputFormat[OUT] = _
+
+  override def open(split: GenericInputSplit): Unit = {
+    LOG.debug(s"Compiling GenericInputFormat: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating GenericInputFormat.")
+    format = clazz.newInstance()
+  }
 
-    if (readIdx == rows.size) {
-      return null
-    }
+  override def reachedEnd(): Boolean = format.reachedEnd()
 
-    val outRow = rows(readIdx)
-    readIdx += 1
+  override def nextRecord(reuse: OUT): OUT = format.nextRecord(reuse)
 
-    outRow
-  }
+  override def getProducedType: TypeInformation[OUT] = returnType
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java
index 1cc4ff7..5f50517 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java
@@ -42,6 +42,26 @@ public class SqlITCase extends TableProgramsTestBase {
 	}
 
 	@Test
+	public void testValues() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		String sqlQuery = "VALUES (1, 'Test', TRUE, DATE '1944-02-24', 12.4444444444444445)," +
+			"(2, 'Hello', TRUE, DATE '1944-02-24', 12.666666665)," +
+			"(3, 'World', FALSE, DATE '1944-12-24', 12.54444445)";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		resultSet.print();
+		List<Row> results = resultSet.collect();
+		String expected = "3,World,false,1944-12-24,12.5444444500000000\n" +
+			"2,Hello,true,1944-02-24,12.6666666650000000\n" +
+			// Calcite converts to decimals and strings with equal length
+			"1,Test ,true,1944-02-24,12.4444444444444445\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
 	public void testSelectFromTable() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());

http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
index ee67ffb..6720759 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
@@ -34,7 +34,7 @@ import org.apache.flink.api.table.expressions.{Expression, ExpressionParser}
 import org.apache.flink.api.table.functions.UserDefinedFunction
 import org.apache.flink.api.table.plan.nodes.dataset.{DataSetCalc, DataSetConvention}
 import org.apache.flink.api.table.plan.rules.FlinkRuleSets
-import org.apache.flink.api.table.runtime.FunctionCompiler
+import org.apache.flink.api.table.runtime.Compiler
 import org.apache.flink.api.table.typeutils.RowTypeInfo
 import org.junit.Assert._
 import org.junit.{After, Before}
@@ -211,7 +211,7 @@ abstract class ExpressionTestBase {
   // ----------------------------------------------------------------------------------------------
 
   // TestCompiler that uses current class loader
-  class TestCompiler[T <: Function] extends FunctionCompiler[T] {
+  class TestCompiler[T <: Function] extends Compiler[T] {
     def compile(genFunc: GeneratedFunction[T]): Class[T] =
       compile(getClass.getClassLoader, genFunc.name, genFunc.code)
   }