You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/11/17 13:45:31 UTC
flink git commit: [FLINK-4263] [table] SQL's VALUES does not work
properly
Repository: flink
Updated Branches:
refs/heads/master a1362c3af -> 836fe9786
[FLINK-4263] [table] SQL's VALUES does not work properly
This closes #2818.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/836fe978
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/836fe978
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/836fe978
Branch: refs/heads/master
Commit: 836fe9786631f35953c1423e2d92128e9f292621
Parents: a1362c3
Author: twalthr <tw...@apache.org>
Authored: Wed Nov 16 14:37:08 2016 +0100
Committer: twalthr <tw...@apache.org>
Committed: Thu Nov 17 14:39:44 2016 +0100
----------------------------------------------------------------------
flink-libraries/flink-table/pom.xml | 2 +-
.../flink/api/table/codegen/CodeGenerator.scala | 65 +++++++++++++++++++-
.../plan/nodes/dataset/DataSetValues.scala | 40 ++++++++----
.../nodes/datastream/DataStreamValues.scala | 42 +++++++++----
.../plan/rules/dataSet/DataSetValuesRule.scala | 3 +-
.../rules/datastream/DataStreamValuesRule.scala | 3 +-
.../flink/api/table/runtime/Compiler.scala | 42 +++++++++++++
.../api/table/runtime/FlatJoinRunner.scala | 2 +-
.../flink/api/table/runtime/FlatMapRunner.scala | 4 +-
.../api/table/runtime/FunctionCompiler.scala | 42 -------------
.../flink/api/table/runtime/MapRunner.scala | 2 +-
.../table/runtime/io/ValuesInputFormat.scala | 46 ++++++++------
.../flink/api/java/batch/sql/SqlITCase.java | 20 ++++++
.../expressions/utils/ExpressionTestBase.scala | 4 +-
14 files changed, 221 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml
index 4c91f1c..7e5f591 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -45,7 +45,7 @@ under the License.
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
- <version>2.7.5</version>
+ <version>3.0.6</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
index b54c498..bbcd70f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
@@ -26,6 +26,7 @@ import org.apache.calcite.sql.SqlOperator
import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql.fun.SqlStdOperatorTable._
import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction, Function, MapFunction}
+import org.apache.flink.api.common.io.GenericInputFormat
import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils.{GenericTypeInfo, PojoTypeInfo, TupleTypeInfo}
@@ -35,7 +36,7 @@ import org.apache.flink.api.table.codegen.Indenter.toISC
import org.apache.flink.api.table.codegen.calls.ScalarFunctions
import org.apache.flink.api.table.codegen.calls.ScalarOperators._
import org.apache.flink.api.table.functions.UserDefinedFunction
-import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.typeutils.{RowTypeInfo, TypeConverter}
import org.apache.flink.api.table.typeutils.TypeCheckUtils._
import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig}
@@ -98,6 +99,13 @@ class CodeGenerator(
inputPojoFieldMapping: Array[Int]) =
this(config, nullableInput, input, None, Some(inputPojoFieldMapping))
+ /**
+ * A code generator for generating Flink input formats.
+ *
+ * @param config configuration that determines runtime behavior
+ */
+ def this(config: TableConfig) =
+ this(config, false, TypeConverter.DEFAULT_ROW_TYPE, None, None)
// set of member statements that will be added only once
// we use a LinkedHashSet to keep the insertion order
@@ -257,6 +265,61 @@ class CodeGenerator(
}
/**
+ * Generates a values input format that can be passed to Java compiler.
+ *
+ * @param name Class name of the input format. Must not be unique but has to be a
+ * valid Java class identifier.
+ * @param records code for creating records
+ * @param returnType expected return type
+ * @tparam T Flink Function to be generated.
+ * @return instance of GeneratedFunction
+ */
+ def generateValuesInputFormat[T](
+ name: String,
+ records: Seq[String],
+ returnType: TypeInformation[Any])
+ : GeneratedFunction[GenericInputFormat[T]] = {
+ val funcName = newName(name)
+
+ addReusableOutRecord(returnType)
+
+ val funcCode = j"""
+ public class $funcName extends ${classOf[GenericInputFormat[_]].getCanonicalName} {
+
+ private int nextIdx = 0;
+
+ ${reuseMemberCode()}
+
+ public $funcName() throws Exception {
+ ${reuseInitCode()}
+ }
+
+ @Override
+ public boolean reachedEnd() throws java.io.IOException {
+ return nextIdx >= ${records.length};
+ }
+
+ @Override
+ public Object nextRecord(Object reuse) {
+ switch (nextIdx) {
+ ${records.zipWithIndex.map { case (r, i) =>
+ s"""
+ |case $i:
+ | $r
+ |break;
+ """.stripMargin
+ }.mkString("\n")}
+ }
+ nextIdx++;
+ return $outRecordTerm;
+ }
+ }
+ """.stripMargin
+
+ GeneratedFunction[GenericInputFormat[T]](funcName, returnType, funcCode)
+ }
+
+ /**
* Generates an expression that converts the first input (and second input) into the given type.
* If two inputs are converted, the second input is appended. If objects or variables can
* be reused, they will be added to reusable code sections internally. The evaluation result
http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
index 1b637c8..4f3a257 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
@@ -20,19 +20,18 @@ package org.apache.flink.api.table.plan.nodes.dataset
import com.google.common.collect.ImmutableList
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.Values
+import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rex.RexLiteral
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.codegen.CodeGenerator
import org.apache.flink.api.table.runtime.io.ValuesInputFormat
-import org.apache.flink.api.table.typeutils.RowTypeInfo
import org.apache.flink.api.table.typeutils.TypeConverter._
-import org.apache.flink.api.table.{BatchTableEnvironment, Row}
import scala.collection.JavaConverters._
-import scala.collection.JavaConversions._
/**
* DataSet RelNode for a LogicalValues.
@@ -42,7 +41,8 @@ class DataSetValues(
cluster: RelOptCluster,
traitSet: RelTraitSet,
rowRelDataType: RelDataType,
- tuples: ImmutableList[ImmutableList[RexLiteral]])
+ tuples: ImmutableList[ImmutableList[RexLiteral]],
+ ruleDescription: String)
extends Values(cluster, rowRelDataType, tuples, traitSet)
with DataSetRel {
@@ -53,7 +53,8 @@ class DataSetValues(
cluster,
traitSet,
getRowType,
- getTuples
+ getTuples,
+ ruleDescription
)
}
@@ -75,16 +76,29 @@ class DataSetValues(
getRowType,
expectedType,
config.getNullCheck,
- config.getEfficientTypeUsage).asInstanceOf[RowTypeInfo]
+ config.getEfficientTypeUsage)
+
+ val generator = new CodeGenerator(config)
- // convert List[RexLiteral] to Row
- val rows: Seq[Row] = getTuples.asList.map { t =>
- val row = new Row(t.size())
- t.zipWithIndex.foreach( x => row.setField(x._2, x._1.getValue.asInstanceOf[Any]) )
- row
+ // generate code for every record
+ val generatedRecords = getTuples.asScala.map { r =>
+ generator.generateResultExpression(
+ returnType,
+ getRowType.getFieldNames.asScala,
+ r.asScala)
}
- val inputFormat = new ValuesInputFormat(rows)
+ // generate input format
+ val generatedFunction = generator.generateValuesInputFormat(
+ ruleDescription,
+ generatedRecords.map(_.code),
+ returnType)
+
+ val inputFormat = new ValuesInputFormat[Any](
+ generatedFunction.name,
+ generatedFunction.code,
+ generatedFunction.returnType)
+
tableEnv.execEnv.createInput(inputFormat, returnType).asInstanceOf[DataSet[Any]]
}
http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
index 44130e7..3b98653 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
@@ -25,13 +25,13 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.Values
import org.apache.calcite.rex.RexLiteral
import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.StreamTableEnvironment
+import org.apache.flink.api.table.codegen.CodeGenerator
import org.apache.flink.api.table.runtime.io.ValuesInputFormat
-import org.apache.flink.api.table.{Row, StreamTableEnvironment}
-import org.apache.flink.api.table.typeutils.RowTypeInfo
import org.apache.flink.api.table.typeutils.TypeConverter._
import org.apache.flink.streaming.api.datastream.DataStream
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
/**
* DataStream RelNode for LogicalValues.
@@ -40,7 +40,8 @@ class DataStreamValues(
cluster: RelOptCluster,
traitSet: RelTraitSet,
rowRelDataType: RelDataType,
- tuples: ImmutableList[ImmutableList[RexLiteral]])
+ tuples: ImmutableList[ImmutableList[RexLiteral]],
+ ruleDescription: String)
extends Values(cluster, rowRelDataType, tuples, traitSet)
with DataStreamRel {
@@ -51,13 +52,15 @@ class DataStreamValues(
cluster,
traitSet,
getRowType,
- getTuples
+ getTuples,
+ ruleDescription
)
}
override def translateToPlan(
tableEnv: StreamTableEnvironment,
- expectedType: Option[TypeInformation[Any]]) : DataStream[Any] = {
+ expectedType: Option[TypeInformation[Any]])
+ : DataStream[Any] = {
val config = tableEnv.getConfig
@@ -65,16 +68,29 @@ class DataStreamValues(
getRowType,
expectedType,
config.getNullCheck,
- config.getEfficientTypeUsage).asInstanceOf[RowTypeInfo]
+ config.getEfficientTypeUsage)
- // convert List[RexLiteral] to Row
- val rows: Seq[Row] = getTuples.asList.map { t =>
- val row = new Row(t.size())
- t.zipWithIndex.foreach( x => row.setField(x._2, x._1.getValue.asInstanceOf[Any]) )
- row
+ val generator = new CodeGenerator(config)
+
+ // generate code for every record
+ val generatedRecords = getTuples.asScala.map { r =>
+ generator.generateResultExpression(
+ returnType,
+ getRowType.getFieldNames.asScala,
+ r.asScala)
}
- val inputFormat = new ValuesInputFormat(rows)
+ // generate input format
+ val generatedFunction = generator.generateValuesInputFormat(
+ ruleDescription,
+ generatedRecords.map(_.code),
+ returnType)
+
+ val inputFormat = new ValuesInputFormat[Any](
+ generatedFunction.name,
+ generatedFunction.code,
+ generatedFunction.returnType)
+
tableEnv.execEnv.createInput(inputFormat, returnType).asInstanceOf[DataStream[Any]]
}
http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala
index c28b458..3d6c0de 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala
@@ -41,7 +41,8 @@ class DataSetValuesRule
rel.getCluster,
traitSet,
rel.getRowType,
- values.getTuples)
+ values.getTuples,
+ description)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala
index fa2b428..738642d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala
@@ -41,7 +41,8 @@ class DataStreamValuesRule
rel.getCluster,
traitSet,
rel.getRowType,
- values.getTuples)
+ values.getTuples,
+ description)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/Compiler.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/Compiler.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/Compiler.scala
new file mode 100644
index 0000000..c5d566e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/Compiler.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.common.functions.Function
+import org.codehaus.commons.compiler.CompileException
+import org.codehaus.janino.SimpleCompiler
+
+trait Compiler[T] {
+
+ @throws(classOf[CompileException])
+ def compile(cl: ClassLoader, name: String, code: String): Class[T] = {
+ require(cl != null, "Classloader must not be null.")
+ val compiler = new SimpleCompiler()
+ compiler.setParentClassLoader(cl)
+ try {
+ compiler.cook(code)
+ } catch {
+ case e: CompileException =>
+ throw new InvalidProgramException("Table program cannot be compiled. " +
+ "This is a bug. Please file an issue.", e)
+ }
+ compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]]
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala
index 6e7d099..c6a8fe8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala
@@ -31,7 +31,7 @@ class FlatJoinRunner[IN1, IN2, OUT](
@transient returnType: TypeInformation[OUT])
extends RichFlatJoinFunction[IN1, IN2, OUT]
with ResultTypeQueryable[OUT]
- with FunctionCompiler[FlatJoinFunction[IN1, IN2, OUT]] {
+ with Compiler[FlatJoinFunction[IN1, IN2, OUT]] {
val LOG = LoggerFactory.getLogger(this.getClass)
http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
index 8a3482f..2e942eb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
@@ -31,11 +31,11 @@ class FlatMapRunner[IN, OUT](
@transient returnType: TypeInformation[OUT])
extends RichFlatMapFunction[IN, OUT]
with ResultTypeQueryable[OUT]
- with FunctionCompiler[FlatMapFunction[IN, OUT]] {
+ with Compiler[FlatMapFunction[IN, OUT]] {
val LOG = LoggerFactory.getLogger(this.getClass)
- private var function: FlatMapFunction[IN, OUT] = null
+ private var function: FlatMapFunction[IN, OUT] = _
override def open(parameters: Configuration): Unit = {
LOG.debug(s"Compiling FlatMapFunction: $name \n\n Code:\n$code")
http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FunctionCompiler.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FunctionCompiler.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FunctionCompiler.scala
deleted file mode 100644
index de9b632..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FunctionCompiler.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.common.InvalidProgramException
-import org.apache.flink.api.common.functions.Function
-import org.codehaus.commons.compiler.CompileException
-import org.codehaus.janino.SimpleCompiler
-
-trait FunctionCompiler[T <: Function] {
-
- @throws(classOf[CompileException])
- def compile(cl: ClassLoader, name: String, code: String): Class[T] = {
- require(cl != null, "Classloader must not be null.")
- val compiler = new SimpleCompiler()
- compiler.setParentClassLoader(cl)
- try {
- compiler.cook(code)
- } catch {
- case e: CompileException =>
- throw new InvalidProgramException("Table program cannot be compiled. " +
- "This is a bug. Please file an issue.", e)
- }
- compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]]
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
index f64635b..944b415 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
@@ -30,7 +30,7 @@ class MapRunner[IN, OUT](
@transient returnType: TypeInformation[OUT])
extends RichMapFunction[IN, OUT]
with ResultTypeQueryable[OUT]
- with FunctionCompiler[MapFunction[IN, OUT]] {
+ with Compiler[MapFunction[IN, OUT]] {
val LOG = LoggerFactory.getLogger(this.getClass)
http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
index 5e0a466..34bff15 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
@@ -19,25 +19,35 @@
package org.apache.flink.api.table.runtime.io
import org.apache.flink.api.common.io.{GenericInputFormat, NonParallelInput}
-import org.apache.flink.api.table.Row
-
-class ValuesInputFormat(val rows: Seq[Row])
- extends GenericInputFormat[Row]
- with NonParallelInput {
-
- var readIdx = 0
-
- override def reachedEnd(): Boolean = readIdx == rows.size
-
- override def nextRecord(reuse: Row): Row = {
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.api.table.runtime.Compiler
+import org.apache.flink.core.io.GenericInputSplit
+import org.slf4j.LoggerFactory
+
+class ValuesInputFormat[OUT](
+ name: String,
+ code: String,
+ @transient returnType: TypeInformation[OUT])
+ extends GenericInputFormat[OUT]
+ with NonParallelInput
+ with ResultTypeQueryable[OUT]
+ with Compiler[GenericInputFormat[OUT]] {
+
+ val LOG = LoggerFactory.getLogger(this.getClass)
+
+ private var format: GenericInputFormat[OUT] = _
+
+ override def open(split: GenericInputSplit): Unit = {
+ LOG.debug(s"Compiling GenericInputFormat: $name \n\n Code:\n$code")
+ val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+ LOG.debug("Instantiating GenericInputFormat.")
+ format = clazz.newInstance()
+ }
- if (readIdx == rows.size) {
- return null
- }
+ override def reachedEnd(): Boolean = format.reachedEnd()
- val outRow = rows(readIdx)
- readIdx += 1
+ override def nextRecord(reuse: OUT): OUT = format.nextRecord(reuse)
- outRow
- }
+ override def getProducedType: TypeInformation[OUT] = returnType
}
http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java
index 1cc4ff7..5f50517 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java
@@ -42,6 +42,26 @@ public class SqlITCase extends TableProgramsTestBase {
}
@Test
+ public void testValues() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ String sqlQuery = "VALUES (1, 'Test', TRUE, DATE '1944-02-24', 12.4444444444444445)," +
+ "(2, 'Hello', TRUE, DATE '1944-02-24', 12.666666665)," +
+ "(3, 'World', FALSE, DATE '1944-12-24', 12.54444445)";
+ Table result = tableEnv.sql(sqlQuery);
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ resultSet.print();
+ List<Row> results = resultSet.collect();
+ String expected = "3,World,false,1944-12-24,12.5444444500000000\n" +
+ "2,Hello,true,1944-02-24,12.6666666650000000\n" +
+ // Calcite converts to decimals and strings with equal length
+ "1,Test ,true,1944-02-24,12.4444444444444445\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
public void testSelectFromTable() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
http://git-wip-us.apache.org/repos/asf/flink/blob/836fe978/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
index ee67ffb..6720759 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
@@ -34,7 +34,7 @@ import org.apache.flink.api.table.expressions.{Expression, ExpressionParser}
import org.apache.flink.api.table.functions.UserDefinedFunction
import org.apache.flink.api.table.plan.nodes.dataset.{DataSetCalc, DataSetConvention}
import org.apache.flink.api.table.plan.rules.FlinkRuleSets
-import org.apache.flink.api.table.runtime.FunctionCompiler
+import org.apache.flink.api.table.runtime.Compiler
import org.apache.flink.api.table.typeutils.RowTypeInfo
import org.junit.Assert._
import org.junit.{After, Before}
@@ -211,7 +211,7 @@ abstract class ExpressionTestBase {
// ----------------------------------------------------------------------------------------------
// TestCompiler that uses current class loader
- class TestCompiler[T <: Function] extends FunctionCompiler[T] {
+ class TestCompiler[T <: Function] extends Compiler[T] {
def compile(genFunc: GeneratedFunction[T]): Class[T] =
compile(getClass.getClassLoader, genFunc.name, genFunc.code)
}