You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by nsengupta <se...@gmail.com> on 2017/02/04 16:46:38 UTC

Table API: java.sql.DateTime is not supported;

I am reading a bunch of records from a CSV file. A record looks like this:

"4/1/2014 0:11:00",40.769,-73.9549,"B02512"

I intend to treat these records as SQL Rows and then process.

Here's the code:
----------------------------------------
package org.nirmalya.exercise

import java.time.LocalDateTime

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.table.TableEnvironment
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.table._
import org.apache.flink.api.table.sources.CsvTableSource
import org.apache.flink.api.scala.table.TableConversions
import org.apache.flink.api.scala._
/**
  * Created by nirmalya on 4/2/17.
  */
object TrafficDataTrainer {

  def main(args: Array[String]): Unit = {

    case class Trip(timeOfPickUp: LocalDateTime, lat: Double, lon: Double,
base: String)

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val myDataStorePath = "/home/nirmalya/Downloads/Traffic"

    val csvTableSource = new CsvTableSource(
      myDataStorePath + "/traffic-raw-data-apr14.csv",
      Array("timeOfPickUp", "lat", "lon", "base"),
      (
        Array[org.apache.flink.api.common.typeinfo.TypeInformation[_]](
          Types.TIMESTAMP,
          Types.DOUBLE,
          Types.DOUBLE,
          Types.STRING
        )
      )
    )

    tableEnv.registerTableSource("TrafficData",csvTableSource)

    val trafficTable = tableEnv.scan("TrafficData")

    val result = trafficTable.select("timeOfPickUp,lat,lon,base")

    val trafficDataSet = new TableConversions(result).toDataSet[Trip]

    trafficDataSet.collect().take(10).foreach(println)
  }
}
----------------------------------------

At run time, the exception that is thrown is:

------------------------------------------------------
Exception in thread "main" java.lang.IllegalArgumentException: The type
'java.sql.Date' is not supported for the CSV input format.
	at
org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldsGeneric(GenericCsvInputFormat.java:306)
	at
org.apache.flink.api.table.runtime.io.RowCsvInputFormat.<init>(RowCsvInputFormat.scala:52)
	at
org.apache.flink.api.table.sources.CsvTableSource.createCsvInput(CsvTableSource.scala:99)
	at
org.apache.flink.api.table.sources.CsvTableSource.getDataSet(CsvTableSource.scala:78)
	at
org.apache.flink.api.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:55)
	at
org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:274)
	at
org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139)
	at
org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:41)
	at org.nirmalya.exercise.UberDataTrainer$.main(UberDataTrainer.scala:45)
	at org.nirmalya.exercise.UberDataTrainer.main(UberDataTrainer.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

------------------------------------------------------

I see that in org.apache.flink.api.common.io.GenericCsvInputFormat:303, the
check fails because the stated type 
isn't a part of known types. However, the constructor of *CsvTableSource*
accepts a /Types.DATE/ as well /Types.TIMESTAMP/ (I tried with both of them,
and the exception is the same).

Could someone please point out where I am going wrong?

-- Nirmalya







--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Table-API-java-sql-DateTime-is-not-supported-tp11439.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Table API: java.sql.DateTime is not supported;

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

you can also use the CsvTableSource and read the DateTime fields as String.
This will directly give you a table. You can implement a user-defined
scalar function [1] to parse the String into a DateTime type.

The benefit is that you stay in the Table API / SQL and don't have to deal
with the DataSet or DataStream API and the conversion.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/table_api.html#user-defined-scalar-functions

2017-02-07 3:16 GMT+01:00 nsengupta <se...@gmail.com>:

> Hello Timo,
>
> Thanks for the clarification.
>
> This means that I *cannot use CsvTableSource*, as I have, in the example.
> Instead, I should:
>
>  *   Write custom Scalar function to convert STRINGs to other datatypes as
> required
>  *   Read the file as CsvInput, with all fields as STRINGs
>  *   Apply the Scalar function as approrpiate and Map() to a desired a
> *DataSet* type
>  *   /Convert/ the DataSet to a Table
>  *    Use SQL to access the Table
>
> Is my understanding correct?
>
> -- Nirmalya
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Table-API-java-
> sql-DateTime-is-not-supported-tp11439p11480.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Table API: java.sql.DateTime is not supported;

Posted by nsengupta <se...@gmail.com>.
Hello Timo,

Thanks for the clarification.

This means that I *cannot use CsvTableSource*, as I have, in the example.
Instead, I should:

 *   Write custom Scalar function to convert STRINGs to other datatypes as
required
 *   Read the file as CsvInput, with all fields as STRINGs
 *   Apply the Scalar function as approrpiate and Map() to a desired a
*DataSet* type
 *   /Convert/ the DataSet to a Table
 *    Use SQL to access the Table 

Is my understanding correct?

-- Nirmalya



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Table-API-java-sql-DateTime-is-not-supported-tp11439p11480.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Table API: java.sql.DateTime is not supported;

Posted by Timo Walther <tw...@apache.org>.
Hi,

java.sql.Timestamps have to have a format like " yyyy-mm-dd 
hh:mm:ss.[fff...]". In your case you need to parse this as a String and 
write your own scalar function for parsing.

Regards,
Timo


Am 04/02/17 um 17:46 schrieb nsengupta:
> "4/1/2014 0:11:00",40.769,-73.9549,"B02512"