You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Till Rohrmann (JIRA)" <ji...@apache.org> on 2016/07/21 17:21:20 UTC
[jira] [Created] (FLINK-4250) Cannot select column from
CsvTableSource
Till Rohrmann created FLINK-4250:
------------------------------------
Summary: Cannot select column from CsvTableSource
Key: FLINK-4250
URL: https://issues.apache.org/jira/browse/FLINK-4250
Project: Flink
Issue Type: Bug
Components: Scala API, Table API & SQL
Affects Versions: 1.1.0
Reporter: Till Rohrmann
Using the Scala Table API and the {{CsvTableSource}} I cannot select a column from the csv source. The following code:
{code}
package com.dataartisans.batch
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo}
import org.apache.flink.api.scala._
import org.apache.flink.api.table.sources.CsvTableSource
import org.apache.flink.api.table.{Row, TableEnvironment, Table}
object CsvTableAPIJob {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val csvFilePath = "table-jobs/src/main/resources/input.csv"
val tblEnv = TableEnvironment.getTableEnvironment(env)
val csvTS = new CsvTableSource(csvFilePath, Array("key", "user", "value", "timestamp"), Array(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
tblEnv.registerTableSource("foobar", csvTS)
val input = tblEnv.sql("SELECT user FROM foobar")
tblEnv.toDataSet[Row](input).print()
}
}
{code}
fails with
{code}
Exception in thread "main" org.apache.flink.api.table.codegen.CodeGenException: Unsupported call: USER
at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782)
at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:782)
at org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:54)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:108)
at org.apache.flink.api.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:168)
at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286)
at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:286)
at org.apache.flink.api.table.plan.nodes.FlinkCalc$class.functionBody(FlinkCalc.scala:52)
at org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.functionBody(DataSetCalc.scala:39)
at org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:108)
at org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:271)
at org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139)
at com.dataartisans.batch.CsvTableAPIJob$.main(CsvTableAPIJob.scala:21)
at com.dataartisans.batch.CsvTableAPIJob.main(CsvTableAPIJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)