You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by nsengupta <se...@gmail.com> on 2017/02/04 16:46:38 UTC
Table API: java.sql.DateTime is not supported;
I am reading a bunch of records from a CSV file. A record looks like this:
"4/1/2014 0:11:00",40.769,-73.9549,"B02512"
I intend to treat these records as SQL Rows and then process.
Here's the code:
----------------------------------------
package org.nirmalya.exercise
import java.time.LocalDateTime
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.table.TableEnvironment
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.table._
import org.apache.flink.api.table.sources.CsvTableSource
import org.apache.flink.api.scala.table.TableConversions
import org.apache.flink.api.scala._
/**
* Created by nirmalya on 4/2/17.
*/
object TrafficDataTrainer {
def main(args: Array[String]): Unit = {
case class Trip(timeOfPickUp: LocalDateTime, lat: Double, lon: Double,
base: String)
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val myDataStorePath = "/home/nirmalya/Downloads/Traffic"
val csvTableSource = new CsvTableSource(
myDataStorePath + "/traffic-raw-data-apr14.csv",
Array("timeOfPickUp", "lat", "lon", "base"),
(
Array[org.apache.flink.api.common.typeinfo.TypeInformation[_]](
Types.TIMESTAMP,
Types.DOUBLE,
Types.DOUBLE,
Types.STRING
)
)
)
tableEnv.registerTableSource("TrafficData",csvTableSource)
val trafficTable = tableEnv.scan("TrafficData")
val result = trafficTable.select("timeOfPickUp,lat,lon,base")
val trafficDataSet = new TableConversions(result).toDataSet[Trip]
trafficDataSet.collect().take(10).foreach(println)
}
}
----------------------------------------
At run time, the exception that is thrown is:
------------------------------------------------------
Exception in thread "main" java.lang.IllegalArgumentException: The type
'java.sql.Date' is not supported for the CSV input format.
at
org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldsGeneric(GenericCsvInputFormat.java:306)
at
org.apache.flink.api.table.runtime.io.RowCsvInputFormat.<init>(RowCsvInputFormat.scala:52)
at
org.apache.flink.api.table.sources.CsvTableSource.createCsvInput(CsvTableSource.scala:99)
at
org.apache.flink.api.table.sources.CsvTableSource.getDataSet(CsvTableSource.scala:78)
at
org.apache.flink.api.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:55)
at
org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:274)
at
org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139)
at
org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:41)
at org.nirmalya.exercise.UberDataTrainer$.main(UberDataTrainer.scala:45)
at org.nirmalya.exercise.UberDataTrainer.main(UberDataTrainer.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
------------------------------------------------------
I see that in org.apache.flink.api.common.io.GenericCsvInputFormat:303, the
check fails because the stated type
isn't a part of known types. However, the constructor of *CsvTableSource*
accepts a /Types.DATE/ as well /Types.TIMESTAMP/ (I tried with both of them,
and the exception is the same).
Could someone please point out where I am going wrong?
-- Nirmalya
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Table-API-java-sql-DateTime-is-not-supported-tp11439.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Table API: java.sql.DateTime is not supported;
Posted by Fabian Hueske <fh...@gmail.com>.
Hi,
you can also use the CsvTableSource and read the DateTime fields as String.
This will directly give you a table. You can implement a user-defined
scalar function [1] to parse the String into a DateTime type.
The benefit is that you stay in the Table API / SQL and don't have to deal
with the DataSet or DataStream API and the conversion.
Best, Fabian
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/table_api.html#user-defined-scalar-functions
2017-02-07 3:16 GMT+01:00 nsengupta <se...@gmail.com>:
> Hello Timo,
>
> Thanks for the clarification.
>
> This means that I *cannot use CsvTableSource*, as I have, in the example.
> Instead, I should:
>
> * Write custom Scalar function to convert STRINGs to other datatypes as
> required
> * Read the file as CsvInput, with all fields as STRINGs
> * Apply the Scalar function as approrpiate and Map() to a desired a
> *DataSet* type
> * /Convert/ the DataSet to a Table
> * Use SQL to access the Table
>
> Is my understanding correct?
>
> -- Nirmalya
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Table-API-java-
> sql-DateTime-is-not-supported-tp11439p11480.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>
Re: Table API: java.sql.DateTime is not supported;
Posted by nsengupta <se...@gmail.com>.
Hello Timo,
Thanks for the clarification.
This means that I *cannot use CsvTableSource*, as I have, in the example.
Instead, I should:
* Write custom Scalar function to convert STRINGs to other datatypes as
required
* Read the file as CsvInput, with all fields as STRINGs
* Apply the Scalar function as approrpiate and Map() to a desired a
*DataSet* type
* /Convert/ the DataSet to a Table
* Use SQL to access the Table
Is my understanding correct?
-- Nirmalya
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Table-API-java-sql-DateTime-is-not-supported-tp11439p11480.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Table API: java.sql.DateTime is not supported;
Posted by Timo Walther <tw...@apache.org>.
Hi,
java.sql.Timestamps have to have a format like " yyyy-mm-dd
hh:mm:ss.[fff...]". In your case you need to parse this as a String and
write your own scalar function for parsing.
Regards,
Timo
Am 04/02/17 um 17:46 schrieb nsengupta:
> "4/1/2014 0:11:00",40.769,-73.9549,"B02512"