You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by karim amer <ka...@gmail.com> on 2018/03/16 21:50:14 UTC
CsvSink
Hi There,
I am trying to write a CSVsink to disk but it's not getting written. I
think the file is getting overwritten or truncated once The Stream process
finishes. Does anyone know why the file is getting overwritten or truncated
and how can i fix this ?
tableEnv.registerDataStream("table", watermarkedStream)
val results = tableEnv.sqlQuery( """
|SELECT
| A
| FROM table
""".stripMargin)
val result: Table = results
val path = "file:///path/test/1.txt"
val sink :TableSink[Row]= new CsvTableSink(
path, // output path
fieldDelim = "|", // optional: delimit files by '|'
numFiles = 1, // optional: write to a single file
writeMode = WriteMode.NO_OVERWRITE)
result.writeToSink(sink)
env.execute("this job")
Thanks
Re: CsvSink
Posted by Fabian Hueske <fh...@gmail.com>.
Great, thanks for reporting back!
Best, Fabian
2018-03-20 22:40 GMT+01:00 karim amer <ka...@gmail.com>:
> Never mind I found the error and has nothing to do with flink.
> Sorry
>
> On Tue, Mar 20, 2018 at 12:12 PM, karim amer <ka...@gmail.com>
> wrote:
>
>> here is the output after fixing the scala issues
>>
>> https://gist.github.com/karimamer/9e3bcf0a6d9110c01caa2ebd14aa7a8c
>>
>> On Tue, Mar 20, 2018 at 11:39 AM, karim amer <ka...@gmail.com>
>> wrote:
>>
>>> Never mind after importing
>>>
>>> import org.apache.flink.api.scala._
>>>
>>> theses errors went away and i still have the original problem.
>>> Sorry my bad
>>>
>>> On Tue, Mar 20, 2018 at 11:04 AM, karim amer <ka...@gmail.com>
>>> wrote:
>>>
>>>> To clarify should i file a bug report on sbt hiding the errors in the
>>>> previous email ?
>>>>
>>>> On Tue, Mar 20, 2018 at 9:44 AM, karim amer <ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> After switching to Maven from Sbt I got these errors
>>>>> Error:(63, 37) could not find implicit value for evidence parameter of
>>>>> type org.apache.flink.api.common.typeinfo.TypeInformation[org.apa
>>>>> che.flink.quickstart.DataStreamtotableapi.Calls]
>>>>> val namedStream = dataStream.map((value:String) => {
>>>>>
>>>>>
>>>>> Error:(63, 37) not enough arguments for method map: (implicit
>>>>> evidence$7: org.apache.flink.api.common.ty
>>>>> peinfo.TypeInformation[org.apache.flink.quickstart.DataStrea
>>>>> mtotableapi.Calls])org.apache.flink.streaming.api.scala.Data
>>>>> Stream[org.apache.flink.quickstart.DataStreamtotableapi.Calls].
>>>>> Unspecified value parameter evidence$7.
>>>>> val namedStream = dataStream.map((value:String) => {
>>>>>
>>>>>
>>>>> Should i file a bug report ?
>>>>>
>>>>> On Tue, Mar 20, 2018 at 9:30 AM, karim amer <ka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Fabian
>>>>>> Sorry if i confused you The first error is from Nico's code Not my
>>>>>> code or snippet
>>>>>> I am still having the original problem in my snippet where it's
>>>>>> writing a blank csv file even though i get
>>>>>> [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
>>>>>> After running the job
>>>>>>
>>>>>> Cheers,
>>>>>> karim
>>>>>>
>>>>>> On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <fh...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Karim,
>>>>>>>
>>>>>>> I cannot find a method invocation "tableEnv.registerDataStream("myTable2",
>>>>>>> set, 'A, 'B, 'C )" as shown in the error message in your example.
>>>>>>> It would help if you would keep error message and code consistent.
>>>>>>> Otherwise it's not possible to figure out what's going on.
>>>>>>>
>>>>>>> Best, Fabian
>>>>>>>
>>>>>>> 2018-03-20 0:24 GMT+01:00 karim amer <ka...@gmail.com>:
>>>>>>>
>>>>>>>> Hi Nico,
>>>>>>>>
>>>>>>>> I tried to reproduce your code but registerDataStream keeps
>>>>>>>> failing to register the fields even though i am following your code and the
>>>>>>>> Docs.
>>>>>>>> here is the error
>>>>>>>> [error] found : Symbol
>>>>>>>> [error] required: org.apache.flink.table.expressions.Expression
>>>>>>>> [error] tableEnv.registerDataStream("myTable2", set, 'A, 'B,
>>>>>>>> 'C )
>>>>>>>> [error]
>>>>>>>> I think my code snippet was misleading. Here is the full snippet
>>>>>>>> Changing the name from table didn't fix it for
>>>>>>>>
>>>>>>>> import org.apache.flink.streaming.api.scala._
>>>>>>>> import org.apache.flink.api.java.utils.ParameterTool
>>>>>>>> import org.apache.flink.core.fs.FileSystem.WriteMode
>>>>>>>> import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
>>>>>>>> import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
>>>>>>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>>>>>>> import org.apache.flink.streaming.api.windowing.time.Time
>>>>>>>> import org.apache.flink.table.api.{Table, TableEnvironment}
>>>>>>>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
>>>>>>>> import org.apache.flink.types.Row
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> object datastreamtotableapi {
>>>>>>>>
>>>>>>>> case class Calls(a: String,
>>>>>>>> b: String,
>>>>>>>> c: String,
>>>>>>>> d: String,
>>>>>>>> e: String,
>>>>>>>> f: String,
>>>>>>>> g: String,
>>>>>>>> h: String,
>>>>>>>> i: String,
>>>>>>>> j: String,
>>>>>>>> k: String,
>>>>>>>> l: String,
>>>>>>>> m: String,
>>>>>>>> n: String,
>>>>>>>> p: String,
>>>>>>>> q: String,
>>>>>>>> r: String,
>>>>>>>> s: String,
>>>>>>>> t: String,
>>>>>>>> v: String,
>>>>>>>> w: String)
>>>>>>>>
>>>>>>>>
>>>>>>>> def main(args: Array[String]) {
>>>>>>>>
>>>>>>>> val params = ParameterTool.fromArgs(args)
>>>>>>>> val input = params.getRequired("input")
>>>>>>>>
>>>>>>>>
>>>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>>>>> env.setParallelism(1)
>>>>>>>> val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>>>>>
>>>>>>>> val dataStream = env.readTextFile(input)
>>>>>>>>
>>>>>>>> val namedStream = dataStream.map((value:String) => {
>>>>>>>>
>>>>>>>> val columns = value.split(",")
>>>>>>>> Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
>>>>>>>> columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
>>>>>>>> columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
>>>>>>>> columns(18),columns(19), columns(20)
>>>>>>>> )
>>>>>>>> })
>>>>>>>>
>>>>>>>>
>>>>>>>> val cleanedStream = namedStream.filter(_.j == " ").filter(_.k==" ")
>>>>>>>>
>>>>>>>> val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
>>>>>>>> override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
>>>>>>>> })
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> tableEnv.registerDataStream("CDRS", watermarkedStream)
>>>>>>>> val results = tableEnv.sqlQuery( """
>>>>>>>> |SELECT
>>>>>>>> | a
>>>>>>>> | FROM CDRS
>>>>>>>> """.stripMargin)
>>>>>>>>
>>>>>>>>
>>>>>>>> val result: Table = results
>>>>>>>>
>>>>>>>> val path = "file:///Users/test/1.txt"
>>>>>>>> val sink :TableSink[Row]= new CsvTableSink(
>>>>>>>> path, // output path
>>>>>>>> fieldDelim = "|", // optional: delimit files by '|'
>>>>>>>> numFiles = 1, // optional: write to a single file
>>>>>>>> writeMode = WriteMode.OVERWRITE)
>>>>>>>>
>>>>>>>> result.writeToSink(sink)
>>>>>>>>
>>>>>>>>
>>>>>>>> env.execute("this job")
>>>>>>>>
>>>>>>>> }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <
>>>>>>>> nico@data-artisans.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Karim,
>>>>>>>>> when I was trying to reproduce your code, I got an exception with
>>>>>>>>> the
>>>>>>>>> name 'table' being used - by replacing it and completing the job
>>>>>>>>> with
>>>>>>>>> some input, I did see the csv file popping up. Also, the job was
>>>>>>>>> crashing when the file 1.txt already existed.
>>>>>>>>>
>>>>>>>>> The code I used (running Flink 1.5-SNAPSHOT):
>>>>>>>>>
>>>>>>>>> def main(args: Array[String]) {
>>>>>>>>> // set up the streaming execution environment
>>>>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>>> val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>>>>>>
>>>>>>>>> val stream: DataStream[(Int, Long, String)] =
>>>>>>>>> get3TupleDataStream(env)
>>>>>>>>> .assignAscendingTimestamps(_._2)
>>>>>>>>> tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
>>>>>>>>>
>>>>>>>>> val results = tableEnv.sqlQuery( """
>>>>>>>>> |SELECT
>>>>>>>>> | A,C
>>>>>>>>> | FROM mytable
>>>>>>>>> """.stripMargin)
>>>>>>>>>
>>>>>>>>> val result: Table = results
>>>>>>>>>
>>>>>>>>> val path = "file:///tmp/test/1.txt"
>>>>>>>>> val sink :TableSink[Row]= new CsvTableSink(
>>>>>>>>> path, // output path
>>>>>>>>> fieldDelim = "|", // optional: delimit files
>>>>>>>>> by '|'
>>>>>>>>> numFiles = 1, // optional: write to a
>>>>>>>>> single file
>>>>>>>>> writeMode = WriteMode.NO_OVERWRITE)
>>>>>>>>>
>>>>>>>>> result.writeToSink(sink)
>>>>>>>>>
>>>>>>>>> env.execute("this job")
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> def get3TupleDataStream(env: StreamExecutionEnvironment):
>>>>>>>>> DataStream[(Int, Long, String)] = {
>>>>>>>>> val data = new mutable.MutableList[(Int, Long, String)]
>>>>>>>>> data.+=((1, 1L, "Hi"))
>>>>>>>>> data.+=((2, 2L, "Hello"))
>>>>>>>>> data.+=((3, 2L, "Hello world"))
>>>>>>>>> data.+=((4, 3L, "Hello world, how are you?"))
>>>>>>>>> data.+=((5, 3L, "I am fine."))
>>>>>>>>> data.+=((6, 3L, "Luke Skywalker"))
>>>>>>>>> env.fromCollection(data)
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Nico
>>>>>>>>>
>>>>>>>>> On 16/03/18 22:50, karim amer wrote:
>>>>>>>>> > Hi There,
>>>>>>>>> >
>>>>>>>>> > I am trying to write a CSVsink to disk but it's not getting
>>>>>>>>> written. I
>>>>>>>>> > think the file is getting overwritten or truncated once The
>>>>>>>>> Stream
>>>>>>>>> > process finishes. Does anyone know why the file is getting
>>>>>>>>> overwritten
>>>>>>>>> > or truncated and how can i fix this ?
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > tableEnv.registerDataStream("table", watermarkedStream)
>>>>>>>>> >
>>>>>>>>> > val results = tableEnv.sqlQuery( """
>>>>>>>>> > |SELECT
>>>>>>>>> > | A
>>>>>>>>> > | FROM table
>>>>>>>>> > """.stripMargin)
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > val result: Table = results
>>>>>>>>> >
>>>>>>>>> > val path = "file:///path/test/1.txt"
>>>>>>>>> > val sink :TableSink[Row]= new CsvTableSink(
>>>>>>>>> > path, // output path
>>>>>>>>> > fieldDelim = "|", // optional: delimit files by '|'
>>>>>>>>> > numFiles = 1, // optional: write to a single file
>>>>>>>>> > writeMode = WriteMode.NO_OVERWRITE)
>>>>>>>>> >
>>>>>>>>> > result.writeToSink(sink)
>>>>>>>>> >
>>>>>>>>> > env.execute("this job")
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > Thanks
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> karim amer
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> karim amer
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> karim amer
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> karim amer
>>>>
>>>>
>>>
>>>
>>> --
>>> karim amer
>>>
>>>
>>
>>
>> --
>> karim amer
>>
>>
>
>
> --
> karim amer
>
>
Re: CsvSink
Posted by karim amer <ka...@gmail.com>.
Never mind I found the error and has nothing to do with flink.
Sorry
On Tue, Mar 20, 2018 at 12:12 PM, karim amer <ka...@gmail.com>
wrote:
> here is the output after fixing the scala issues
>
> https://gist.github.com/karimamer/9e3bcf0a6d9110c01caa2ebd14aa7a8c
>
> On Tue, Mar 20, 2018 at 11:39 AM, karim amer <ka...@gmail.com>
> wrote:
>
>> Never mind after importing
>>
>> import org.apache.flink.api.scala._
>>
>> theses errors went away and i still have the original problem.
>> Sorry my bad
>>
>> On Tue, Mar 20, 2018 at 11:04 AM, karim amer <ka...@gmail.com>
>> wrote:
>>
>>> To clarify should i file a bug report on sbt hiding the errors in the
>>> previous email ?
>>>
>>> On Tue, Mar 20, 2018 at 9:44 AM, karim amer <ka...@gmail.com>
>>> wrote:
>>>
>>>> After switching to Maven from Sbt I got these errors
>>>> Error:(63, 37) could not find implicit value for evidence parameter of
>>>> type org.apache.flink.api.common.typeinfo.TypeInformation[org.apa
>>>> che.flink.quickstart.DataStreamtotableapi.Calls]
>>>> val namedStream = dataStream.map((value:String) => {
>>>>
>>>>
>>>> Error:(63, 37) not enough arguments for method map: (implicit
>>>> evidence$7: org.apache.flink.api.common.ty
>>>> peinfo.TypeInformation[org.apache.flink.quickstart.DataStrea
>>>> mtotableapi.Calls])org.apache.flink.streaming.api.scala.Data
>>>> Stream[org.apache.flink.quickstart.DataStreamtotableapi.Calls].
>>>> Unspecified value parameter evidence$7.
>>>> val namedStream = dataStream.map((value:String) => {
>>>>
>>>>
>>>> Should i file a bug report ?
>>>>
>>>> On Tue, Mar 20, 2018 at 9:30 AM, karim amer <ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Fabian
>>>>> Sorry if i confused you The first error is from Nico's code Not my
>>>>> code or snippet
>>>>> I am still having the original problem in my snippet where it's
>>>>> writing a blank csv file even though i get
>>>>> [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
>>>>> After running the job
>>>>>
>>>>> Cheers,
>>>>> karim
>>>>>
>>>>> On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <fh...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Karim,
>>>>>>
>>>>>> I cannot find a method invocation "tableEnv.registerDataStream("myTable2",
>>>>>> set, 'A, 'B, 'C )" as shown in the error message in your example.
>>>>>> It would help if you would keep error message and code consistent.
>>>>>> Otherwise it's not possible to figure out what's going on.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> 2018-03-20 0:24 GMT+01:00 karim amer <ka...@gmail.com>:
>>>>>>
>>>>>>> Hi Nico,
>>>>>>>
>>>>>>> I tried to reproduce your code but registerDataStream keeps failing
>>>>>>> to register the fields even though i am following your code and the Docs.
>>>>>>> here is the error
>>>>>>> [error] found : Symbol
>>>>>>> [error] required: org.apache.flink.table.expressions.Expression
>>>>>>> [error] tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C
>>>>>>> )
>>>>>>> [error]
>>>>>>> I think my code snippet was misleading. Here is the full snippet
>>>>>>> Changing the name from table didn't fix it for
>>>>>>>
>>>>>>> import org.apache.flink.streaming.api.scala._
>>>>>>> import org.apache.flink.api.java.utils.ParameterTool
>>>>>>> import org.apache.flink.core.fs.FileSystem.WriteMode
>>>>>>> import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
>>>>>>> import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
>>>>>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>>>>>> import org.apache.flink.streaming.api.windowing.time.Time
>>>>>>> import org.apache.flink.table.api.{Table, TableEnvironment}
>>>>>>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
>>>>>>> import org.apache.flink.types.Row
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> object datastreamtotableapi {
>>>>>>>
>>>>>>> case class Calls(a: String,
>>>>>>> b: String,
>>>>>>> c: String,
>>>>>>> d: String,
>>>>>>> e: String,
>>>>>>> f: String,
>>>>>>> g: String,
>>>>>>> h: String,
>>>>>>> i: String,
>>>>>>> j: String,
>>>>>>> k: String,
>>>>>>> l: String,
>>>>>>> m: String,
>>>>>>> n: String,
>>>>>>> p: String,
>>>>>>> q: String,
>>>>>>> r: String,
>>>>>>> s: String,
>>>>>>> t: String,
>>>>>>> v: String,
>>>>>>> w: String)
>>>>>>>
>>>>>>>
>>>>>>> def main(args: Array[String]) {
>>>>>>>
>>>>>>> val params = ParameterTool.fromArgs(args)
>>>>>>> val input = params.getRequired("input")
>>>>>>>
>>>>>>>
>>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>>>> env.setParallelism(1)
>>>>>>> val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>>>>
>>>>>>> val dataStream = env.readTextFile(input)
>>>>>>>
>>>>>>> val namedStream = dataStream.map((value:String) => {
>>>>>>>
>>>>>>> val columns = value.split(",")
>>>>>>> Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
>>>>>>> columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
>>>>>>> columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
>>>>>>> columns(18),columns(19), columns(20)
>>>>>>> )
>>>>>>> })
>>>>>>>
>>>>>>>
>>>>>>> val cleanedStream = namedStream.filter(_.j == " ").filter(_.k==" ")
>>>>>>>
>>>>>>> val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
>>>>>>> override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
>>>>>>> })
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> tableEnv.registerDataStream("CDRS", watermarkedStream)
>>>>>>> val results = tableEnv.sqlQuery( """
>>>>>>> |SELECT
>>>>>>> | a
>>>>>>> | FROM CDRS
>>>>>>> """.stripMargin)
>>>>>>>
>>>>>>>
>>>>>>> val result: Table = results
>>>>>>>
>>>>>>> val path = "file:///Users/test/1.txt"
>>>>>>> val sink :TableSink[Row]= new CsvTableSink(
>>>>>>> path, // output path
>>>>>>> fieldDelim = "|", // optional: delimit files by '|'
>>>>>>> numFiles = 1, // optional: write to a single file
>>>>>>> writeMode = WriteMode.OVERWRITE)
>>>>>>>
>>>>>>> result.writeToSink(sink)
>>>>>>>
>>>>>>>
>>>>>>> env.execute("this job")
>>>>>>>
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <nico@data-artisans.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Hi Karim,
>>>>>>>> when I was trying to reproduce your code, I got an exception with
>>>>>>>> the
>>>>>>>> name 'table' being used - by replacing it and completing the job
>>>>>>>> with
>>>>>>>> some input, I did see the csv file popping up. Also, the job was
>>>>>>>> crashing when the file 1.txt already existed.
>>>>>>>>
>>>>>>>> The code I used (running Flink 1.5-SNAPSHOT):
>>>>>>>>
>>>>>>>> def main(args: Array[String]) {
>>>>>>>> // set up the streaming execution environment
>>>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>> val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>>>>>
>>>>>>>> val stream: DataStream[(Int, Long, String)] =
>>>>>>>> get3TupleDataStream(env)
>>>>>>>> .assignAscendingTimestamps(_._2)
>>>>>>>> tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
>>>>>>>>
>>>>>>>> val results = tableEnv.sqlQuery( """
>>>>>>>> |SELECT
>>>>>>>> | A,C
>>>>>>>> | FROM mytable
>>>>>>>> """.stripMargin)
>>>>>>>>
>>>>>>>> val result: Table = results
>>>>>>>>
>>>>>>>> val path = "file:///tmp/test/1.txt"
>>>>>>>> val sink :TableSink[Row]= new CsvTableSink(
>>>>>>>> path, // output path
>>>>>>>> fieldDelim = "|", // optional: delimit files
>>>>>>>> by '|'
>>>>>>>> numFiles = 1, // optional: write to a
>>>>>>>> single file
>>>>>>>> writeMode = WriteMode.NO_OVERWRITE)
>>>>>>>>
>>>>>>>> result.writeToSink(sink)
>>>>>>>>
>>>>>>>> env.execute("this job")
>>>>>>>> }
>>>>>>>>
>>>>>>>> def get3TupleDataStream(env: StreamExecutionEnvironment):
>>>>>>>> DataStream[(Int, Long, String)] = {
>>>>>>>> val data = new mutable.MutableList[(Int, Long, String)]
>>>>>>>> data.+=((1, 1L, "Hi"))
>>>>>>>> data.+=((2, 2L, "Hello"))
>>>>>>>> data.+=((3, 2L, "Hello world"))
>>>>>>>> data.+=((4, 3L, "Hello world, how are you?"))
>>>>>>>> data.+=((5, 3L, "I am fine."))
>>>>>>>> data.+=((6, 3L, "Luke Skywalker"))
>>>>>>>> env.fromCollection(data)
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> Nico
>>>>>>>>
>>>>>>>> On 16/03/18 22:50, karim amer wrote:
>>>>>>>> > Hi There,
>>>>>>>> >
>>>>>>>> > I am trying to write a CSVsink to disk but it's not getting
>>>>>>>> written. I
>>>>>>>> > think the file is getting overwritten or truncated once The Stream
>>>>>>>> > process finishes. Does anyone know why the file is getting
>>>>>>>> overwritten
>>>>>>>> > or truncated and how can i fix this ?
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > tableEnv.registerDataStream("table", watermarkedStream)
>>>>>>>> >
>>>>>>>> > val results = tableEnv.sqlQuery( """
>>>>>>>> > |SELECT
>>>>>>>> > | A
>>>>>>>> > | FROM table
>>>>>>>> > """.stripMargin)
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > val result: Table = results
>>>>>>>> >
>>>>>>>> > val path = "file:///path/test/1.txt"
>>>>>>>> > val sink :TableSink[Row]= new CsvTableSink(
>>>>>>>> > path, // output path
>>>>>>>> > fieldDelim = "|", // optional: delimit files by '|'
>>>>>>>> > numFiles = 1, // optional: write to a single file
>>>>>>>> > writeMode = WriteMode.NO_OVERWRITE)
>>>>>>>> >
>>>>>>>> > result.writeToSink(sink)
>>>>>>>> >
>>>>>>>> > env.execute("this job")
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > Thanks
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> karim amer
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> karim amer
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> karim amer
>>>>
>>>>
>>>
>>>
>>> --
>>> karim amer
>>>
>>>
>>
>>
>> --
>> karim amer
>>
>>
>
>
> --
> karim amer
>
>
--
karim amer
Re: CsvSink
Posted by karim amer <ka...@gmail.com>.
here is the output after fixing the scala issues
https://gist.github.com/karimamer/9e3bcf0a6d9110c01caa2ebd14aa7a8c
On Tue, Mar 20, 2018 at 11:39 AM, karim amer <ka...@gmail.com>
wrote:
> Never mind after importing
>
> import org.apache.flink.api.scala._
>
> theses errors went away and i still have the original problem.
> Sorry my bad
>
> On Tue, Mar 20, 2018 at 11:04 AM, karim amer <ka...@gmail.com>
> wrote:
>
>> To clarify should i file a bug report on sbt hiding the errors in the
>> previous email ?
>>
>> On Tue, Mar 20, 2018 at 9:44 AM, karim amer <ka...@gmail.com>
>> wrote:
>>
>>> After switching to Maven from Sbt I got these errors
>>> Error:(63, 37) could not find implicit value for evidence parameter of
>>> type org.apache.flink.api.common.typeinfo.TypeInformation[org.apa
>>> che.flink.quickstart.DataStreamtotableapi.Calls]
>>> val namedStream = dataStream.map((value:String) => {
>>>
>>>
>>> Error:(63, 37) not enough arguments for method map: (implicit
>>> evidence$7: org.apache.flink.api.common.typeinfo.TypeInformation[org.apa
>>> che.flink.quickstart.DataStreamtotableapi.Calls])org.apache.
>>> flink.streaming.api.scala.DataStream[org.apache.flink.quicks
>>> tart.DataStreamtotableapi.Calls].
>>> Unspecified value parameter evidence$7.
>>> val namedStream = dataStream.map((value:String) => {
>>>
>>>
>>> Should i file a bug report ?
>>>
>>> On Tue, Mar 20, 2018 at 9:30 AM, karim amer <ka...@gmail.com>
>>> wrote:
>>>
>>>> Hi Fabian
>>>> Sorry if i confused you The first error is from Nico's code Not my code
>>>> or snippet
>>>> I am still having the original problem in my snippet where it's writing
>>>> a blank csv file even though i get
>>>> [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
>>>> After running the job
>>>>
>>>> Cheers,
>>>> karim
>>>>
>>>> On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Karim,
>>>>>
>>>>> I cannot find a method invocation "tableEnv.registerDataStream("myTable2",
>>>>> set, 'A, 'B, 'C )" as shown in the error message in your example.
>>>>> It would help if you would keep error message and code consistent.
>>>>> Otherwise it's not possible to figure out what's going on.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2018-03-20 0:24 GMT+01:00 karim amer <ka...@gmail.com>:
>>>>>
>>>>>> Hi Nico,
>>>>>>
>>>>>> I tried to reproduce your code but registerDataStream keeps failing
>>>>>> to register the fields even though i am following your code and the Docs.
>>>>>> here is the error
>>>>>> [error] found : Symbol
>>>>>> [error] required: org.apache.flink.table.expressions.Expression
>>>>>> [error] tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
>>>>>> [error]
>>>>>> I think my code snippet was misleading. Here is the full snippet
>>>>>> Changing the name from table didn't fix it for
>>>>>>
>>>>>> import org.apache.flink.streaming.api.scala._
>>>>>> import org.apache.flink.api.java.utils.ParameterTool
>>>>>> import org.apache.flink.core.fs.FileSystem.WriteMode
>>>>>> import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
>>>>>> import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
>>>>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>>>>> import org.apache.flink.streaming.api.windowing.time.Time
>>>>>> import org.apache.flink.table.api.{Table, TableEnvironment}
>>>>>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
>>>>>> import org.apache.flink.types.Row
>>>>>>
>>>>>>
>>>>>>
>>>>>> object datastreamtotableapi {
>>>>>>
>>>>>> case class Calls(a: String,
>>>>>> b: String,
>>>>>> c: String,
>>>>>> d: String,
>>>>>> e: String,
>>>>>> f: String,
>>>>>> g: String,
>>>>>> h: String,
>>>>>> i: String,
>>>>>> j: String,
>>>>>> k: String,
>>>>>> l: String,
>>>>>> m: String,
>>>>>> n: String,
>>>>>> p: String,
>>>>>> q: String,
>>>>>> r: String,
>>>>>> s: String,
>>>>>> t: String,
>>>>>> v: String,
>>>>>> w: String)
>>>>>>
>>>>>>
>>>>>> def main(args: Array[String]) {
>>>>>>
>>>>>> val params = ParameterTool.fromArgs(args)
>>>>>> val input = params.getRequired("input")
>>>>>>
>>>>>>
>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>>> env.setParallelism(1)
>>>>>> val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>>>
>>>>>> val dataStream = env.readTextFile(input)
>>>>>>
>>>>>> val namedStream = dataStream.map((value:String) => {
>>>>>>
>>>>>> val columns = value.split(",")
>>>>>> Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
>>>>>> columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
>>>>>> columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
>>>>>> columns(18),columns(19), columns(20)
>>>>>> )
>>>>>> })
>>>>>>
>>>>>>
>>>>>> val cleanedStream = namedStream.filter(_.j == " ").filter(_.k==" ")
>>>>>>
>>>>>> val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
>>>>>> override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
>>>>>> })
>>>>>>
>>>>>>
>>>>>>
>>>>>> tableEnv.registerDataStream("CDRS", watermarkedStream)
>>>>>> val results = tableEnv.sqlQuery( """
>>>>>> |SELECT
>>>>>> | a
>>>>>> | FROM CDRS
>>>>>> """.stripMargin)
>>>>>>
>>>>>>
>>>>>> val result: Table = results
>>>>>>
>>>>>> val path = "file:///Users/test/1.txt"
>>>>>> val sink :TableSink[Row]= new CsvTableSink(
>>>>>> path, // output path
>>>>>> fieldDelim = "|", // optional: delimit files by '|'
>>>>>> numFiles = 1, // optional: write to a single file
>>>>>> writeMode = WriteMode.OVERWRITE)
>>>>>>
>>>>>> result.writeToSink(sink)
>>>>>>
>>>>>>
>>>>>> env.execute("this job")
>>>>>>
>>>>>> }
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <ni...@data-artisans.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Karim,
>>>>>>> when I was trying to reproduce your code, I got an exception with the
>>>>>>> name 'table' being used - by replacing it and completing the job with
>>>>>>> some input, I did see the csv file popping up. Also, the job was
>>>>>>> crashing when the file 1.txt already existed.
>>>>>>>
>>>>>>> The code I used (running Flink 1.5-SNAPSHOT):
>>>>>>>
>>>>>>> def main(args: Array[String]) {
>>>>>>> // set up the streaming execution environment
>>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>> val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>>>>
>>>>>>> val stream: DataStream[(Int, Long, String)] =
>>>>>>> get3TupleDataStream(env)
>>>>>>> .assignAscendingTimestamps(_._2)
>>>>>>> tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
>>>>>>>
>>>>>>> val results = tableEnv.sqlQuery( """
>>>>>>> |SELECT
>>>>>>> | A,C
>>>>>>> | FROM mytable
>>>>>>> """.stripMargin)
>>>>>>>
>>>>>>> val result: Table = results
>>>>>>>
>>>>>>> val path = "file:///tmp/test/1.txt"
>>>>>>> val sink :TableSink[Row]= new CsvTableSink(
>>>>>>> path, // output path
>>>>>>> fieldDelim = "|", // optional: delimit files
>>>>>>> by '|'
>>>>>>> numFiles = 1, // optional: write to a
>>>>>>> single file
>>>>>>> writeMode = WriteMode.NO_OVERWRITE)
>>>>>>>
>>>>>>> result.writeToSink(sink)
>>>>>>>
>>>>>>> env.execute("this job")
>>>>>>> }
>>>>>>>
>>>>>>> def get3TupleDataStream(env: StreamExecutionEnvironment):
>>>>>>> DataStream[(Int, Long, String)] = {
>>>>>>> val data = new mutable.MutableList[(Int, Long, String)]
>>>>>>> data.+=((1, 1L, "Hi"))
>>>>>>> data.+=((2, 2L, "Hello"))
>>>>>>> data.+=((3, 2L, "Hello world"))
>>>>>>> data.+=((4, 3L, "Hello world, how are you?"))
>>>>>>> data.+=((5, 3L, "I am fine."))
>>>>>>> data.+=((6, 3L, "Luke Skywalker"))
>>>>>>> env.fromCollection(data)
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> Nico
>>>>>>>
>>>>>>> On 16/03/18 22:50, karim amer wrote:
>>>>>>> > Hi There,
>>>>>>> >
>>>>>>> > I am trying to write a CSVsink to disk but it's not getting
>>>>>>> written. I
>>>>>>> > think the file is getting overwritten or truncated once The Stream
>>>>>>> > process finishes. Does anyone know why the file is getting
>>>>>>> overwritten
>>>>>>> > or truncated and how can i fix this ?
>>>>>>> >
>>>>>>> >
>>>>>>> > tableEnv.registerDataStream("table", watermarkedStream)
>>>>>>> >
>>>>>>> > val results = tableEnv.sqlQuery( """
>>>>>>> > |SELECT
>>>>>>> > | A
>>>>>>> > | FROM table
>>>>>>> > """.stripMargin)
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > val result: Table = results
>>>>>>> >
>>>>>>> > val path = "file:///path/test/1.txt"
>>>>>>> > val sink :TableSink[Row]= new CsvTableSink(
>>>>>>> > path, // output path
>>>>>>> > fieldDelim = "|", // optional: delimit files by '|'
>>>>>>> > numFiles = 1, // optional: write to a single file
>>>>>>> > writeMode = WriteMode.NO_OVERWRITE)
>>>>>>> >
>>>>>>> > result.writeToSink(sink)
>>>>>>> >
>>>>>>> > env.execute("this job")
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > Thanks
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> karim amer
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> karim amer
>>>>
>>>>
>>>
>>>
>>> --
>>> karim amer
>>>
>>>
>>
>>
>> --
>> karim amer
>>
>>
>
>
> --
> karim amer
>
>
--
karim amer
Re: CsvSink
Posted by karim amer <ka...@gmail.com>.
Never mind after importing
import org.apache.flink.api.scala._
theses errors went away and i still have the original problem.
Sorry my bad
On Tue, Mar 20, 2018 at 11:04 AM, karim amer <ka...@gmail.com>
wrote:
> To clarify should i file a bug report on sbt hiding the errors in the
> previous email ?
>
> On Tue, Mar 20, 2018 at 9:44 AM, karim amer <ka...@gmail.com>
> wrote:
>
>> After switching to Maven from Sbt I got these errors
>> Error:(63, 37) could not find implicit value for evidence parameter of
>> type org.apache.flink.api.common.typeinfo.TypeInformation[org.apa
>> che.flink.quickstart.DataStreamtotableapi.Calls]
>> val namedStream = dataStream.map((value:String) => {
>>
>>
>> Error:(63, 37) not enough arguments for method map: (implicit evidence$7:
>> org.apache.flink.api.common.typeinfo.TypeInformation[org.apa
>> che.flink.quickstart.DataStreamtotableapi.Calls])org.apache.
>> flink.streaming.api.scala.DataStream[org.apache.flink.
>> quickstart.DataStreamtotableapi.Calls].
>> Unspecified value parameter evidence$7.
>> val namedStream = dataStream.map((value:String) => {
>>
>>
>> Should i file a bug report ?
>>
>> On Tue, Mar 20, 2018 at 9:30 AM, karim amer <ka...@gmail.com>
>> wrote:
>>
>>> Hi Fabian
>>> Sorry if i confused you The first error is from Nico's code Not my code
>>> or snippet
>>> I am still having the original problem in my snippet where it's writing
>>> a blank csv file even though i get
>>> [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
>>> After running the job
>>>
>>> Cheers,
>>> karim
>>>
>>> On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <fh...@gmail.com>
>>> wrote:
>>>
>>>> Hi Karim,
>>>>
>>>> I cannot find a method invocation "tableEnv.registerDataStream("myTable2",
>>>> set, 'A, 'B, 'C )" as shown in the error message in your example.
>>>> It would help if you would keep error message and code consistent.
>>>> Otherwise it's not possible to figure out what's going on.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2018-03-20 0:24 GMT+01:00 karim amer <ka...@gmail.com>:
>>>>
>>>>> Hi Nico,
>>>>>
>>>>> I tried to reproduce your code but registerDataStream keeps failing
>>>>> to register the fields even though i am following your code and the Docs.
>>>>> here is the error
>>>>> [error] found : Symbol
>>>>> [error] required: org.apache.flink.table.expressions.Expression
>>>>> [error] tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
>>>>> [error]
>>>>> I think my code snippet was misleading. Here is the full snippet
>>>>> Changing the name from table didn't fix it for
>>>>>
>>>>> import org.apache.flink.streaming.api.scala._
>>>>> import org.apache.flink.api.java.utils.ParameterTool
>>>>> import org.apache.flink.core.fs.FileSystem.WriteMode
>>>>> import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
>>>>> import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
>>>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>>>> import org.apache.flink.streaming.api.windowing.time.Time
>>>>> import org.apache.flink.table.api.{Table, TableEnvironment}
>>>>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
>>>>> import org.apache.flink.types.Row
>>>>>
>>>>>
>>>>>
>>>>> object datastreamtotableapi {
>>>>>
>>>>> case class Calls(a: String,
>>>>> b: String,
>>>>> c: String,
>>>>> d: String,
>>>>> e: String,
>>>>> f: String,
>>>>> g: String,
>>>>> h: String,
>>>>> i: String,
>>>>> j: String,
>>>>> k: String,
>>>>> l: String,
>>>>> m: String,
>>>>> n: String,
>>>>> p: String,
>>>>> q: String,
>>>>> r: String,
>>>>> s: String,
>>>>> t: String,
>>>>> v: String,
>>>>> w: String)
>>>>>
>>>>>
>>>>> def main(args: Array[String]) {
>>>>>
>>>>> val params = ParameterTool.fromArgs(args)
>>>>> val input = params.getRequired("input")
>>>>>
>>>>>
>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>> env.setParallelism(1)
>>>>> val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>>
>>>>> val dataStream = env.readTextFile(input)
>>>>>
>>>>> val namedStream = dataStream.map((value:String) => {
>>>>>
>>>>> val columns = value.split(",")
>>>>> Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
>>>>> columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
>>>>> columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
>>>>> columns(18),columns(19), columns(20)
>>>>> )
>>>>> })
>>>>>
>>>>>
>>>>> val cleanedStream = namedStream.filter(_.j == " ").filter(_.k==" ")
>>>>>
>>>>> val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
>>>>> override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
>>>>> })
>>>>>
>>>>>
>>>>>
>>>>> tableEnv.registerDataStream("CDRS", watermarkedStream)
>>>>> val results = tableEnv.sqlQuery( """
>>>>> |SELECT
>>>>> | a
>>>>> | FROM CDRS
>>>>> """.stripMargin)
>>>>>
>>>>>
>>>>> val result: Table = results
>>>>>
>>>>> val path = "file:///Users/test/1.txt"
>>>>> val sink :TableSink[Row]= new CsvTableSink(
>>>>> path, // output path
>>>>> fieldDelim = "|", // optional: delimit files by '|'
>>>>> numFiles = 1, // optional: write to a single file
>>>>> writeMode = WriteMode.OVERWRITE)
>>>>>
>>>>> result.writeToSink(sink)
>>>>>
>>>>>
>>>>> env.execute("this job")
>>>>>
>>>>> }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <ni...@data-artisans.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Karim,
>>>>>> when I was trying to reproduce your code, I got an exception with the
>>>>>> name 'table' being used - by replacing it and completing the job with
>>>>>> some input, I did see the csv file popping up. Also, the job was
>>>>>> crashing when the file 1.txt already existed.
>>>>>>
>>>>>> The code I used (running Flink 1.5-SNAPSHOT):
>>>>>>
>>>>>> def main(args: Array[String]) {
>>>>>> // set up the streaming execution environment
>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>> val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>>>
>>>>>> val stream: DataStream[(Int, Long, String)] =
>>>>>> get3TupleDataStream(env)
>>>>>> .assignAscendingTimestamps(_._2)
>>>>>> tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
>>>>>>
>>>>>> val results = tableEnv.sqlQuery( """
>>>>>> |SELECT
>>>>>> | A,C
>>>>>> | FROM mytable
>>>>>> """.stripMargin)
>>>>>>
>>>>>> val result: Table = results
>>>>>>
>>>>>> val path = "file:///tmp/test/1.txt"
>>>>>> val sink :TableSink[Row]= new CsvTableSink(
>>>>>> path, // output path
>>>>>> fieldDelim = "|", // optional: delimit files by
>>>>>> '|'
>>>>>> numFiles = 1, // optional: write to a
>>>>>> single file
>>>>>> writeMode = WriteMode.NO_OVERWRITE)
>>>>>>
>>>>>> result.writeToSink(sink)
>>>>>>
>>>>>> env.execute("this job")
>>>>>> }
>>>>>>
>>>>>> def get3TupleDataStream(env: StreamExecutionEnvironment):
>>>>>> DataStream[(Int, Long, String)] = {
>>>>>> val data = new mutable.MutableList[(Int, Long, String)]
>>>>>> data.+=((1, 1L, "Hi"))
>>>>>> data.+=((2, 2L, "Hello"))
>>>>>> data.+=((3, 2L, "Hello world"))
>>>>>> data.+=((4, 3L, "Hello world, how are you?"))
>>>>>> data.+=((5, 3L, "I am fine."))
>>>>>> data.+=((6, 3L, "Luke Skywalker"))
>>>>>> env.fromCollection(data)
>>>>>> }
>>>>>>
>>>>>>
>>>>>> Nico
>>>>>>
>>>>>> On 16/03/18 22:50, karim amer wrote:
>>>>>> > Hi There,
>>>>>> >
>>>>>> > I am trying to write a CSVsink to disk but it's not getting
>>>>>> written. I
>>>>>> > think the file is getting overwritten or truncated once The Stream
>>>>>> > process finishes. Does anyone know why the file is getting
>>>>>> overwritten
>>>>>> > or truncated and how can i fix this ?
>>>>>> >
>>>>>> >
>>>>>> > tableEnv.registerDataStream("table", watermarkedStream)
>>>>>> >
>>>>>> > val results = tableEnv.sqlQuery( """
>>>>>> > |SELECT
>>>>>> > | A
>>>>>> > | FROM table
>>>>>> > """.stripMargin)
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > val result: Table = results
>>>>>> >
>>>>>> > val path = "file:///path/test/1.txt"
>>>>>> > val sink :TableSink[Row]= new CsvTableSink(
>>>>>> > path, // output path
>>>>>> > fieldDelim = "|", // optional: delimit files by '|'
>>>>>> > numFiles = 1, // optional: write to a single file
>>>>>> > writeMode = WriteMode.NO_OVERWRITE)
>>>>>> >
>>>>>> > result.writeToSink(sink)
>>>>>> >
>>>>>> > env.execute("this job")
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > Thanks
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> karim amer
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> karim amer
>>>
>>>
>>
>>
>> --
>> karim amer
>>
>>
>
>
> --
> karim amer
>
>
--
karim amer
Re: CsvSink
Posted by karim amer <ka...@gmail.com>.
To clarify should i file a bug report on sbt hiding the errors in the
previous email ?
On Tue, Mar 20, 2018 at 9:44 AM, karim amer <ka...@gmail.com> wrote:
> After switching to Maven from Sbt I got these errors
> Error:(63, 37) could not find implicit value for evidence parameter of
> type org.apache.flink.api.common.typeinfo.TypeInformation[org.
> apache.flink.quickstart.DataStreamtotableapi.Calls]
> val namedStream = dataStream.map((value:String) => {
>
>
> Error:(63, 37) not enough arguments for method map: (implicit evidence$7:
> org.apache.flink.api.common.typeinfo.TypeInformation[org.
> apache.flink.quickstart.DataStreamtotableapi.Calls])
> org.apache.flink.streaming.api.scala.DataStream[org.
> apache.flink.quickstart.DataStreamtotableapi.Calls].
> Unspecified value parameter evidence$7.
> val namedStream = dataStream.map((value:String) => {
>
>
> Should i file a bug report ?
>
> On Tue, Mar 20, 2018 at 9:30 AM, karim amer <ka...@gmail.com>
> wrote:
>
>> Hi Fabian
>> Sorry if i confused you The first error is from Nico's code Not my code
>> or snippet
>> I am still having the original problem in my snippet where it's writing a
>> blank csv file even though i get
>> [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
>> After running the job
>>
>> Cheers,
>> karim
>>
>> On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi Karim,
>>>
>>> I cannot find a method invocation "tableEnv.registerDataStream("myTable2",
>>> set, 'A, 'B, 'C )" as shown in the error message in your example.
>>> It would help if you would keep error message and code consistent.
>>> Otherwise it's not possible to figure out what's going on.
>>>
>>> Best, Fabian
>>>
>>> 2018-03-20 0:24 GMT+01:00 karim amer <ka...@gmail.com>:
>>>
>>>> Hi Nico,
>>>>
>>>> I tried to reproduce your code but registerDataStream keeps failing to
>>>> register the fields even though i am following your code and the Docs.
>>>> here is the error
>>>> [error] found : Symbol
>>>> [error] required: org.apache.flink.table.expressions.Expression
>>>> [error] tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
>>>> [error]
>>>> I think my code snippet was misleading. Here is the full snippet
>>>> Changing the name from table didn't fix it for
>>>>
>>>> import org.apache.flink.streaming.api.scala._
>>>> import org.apache.flink.api.java.utils.ParameterTool
>>>> import org.apache.flink.core.fs.FileSystem.WriteMode
>>>> import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
>>>> import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
>>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>>> import org.apache.flink.streaming.api.windowing.time.Time
>>>> import org.apache.flink.table.api.{Table, TableEnvironment}
>>>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
>>>> import org.apache.flink.types.Row
>>>>
>>>>
>>>>
>>>> object datastreamtotableapi {
>>>>
>>>> case class Calls(a: String,
>>>> b: String,
>>>> c: String,
>>>> d: String,
>>>> e: String,
>>>> f: String,
>>>> g: String,
>>>> h: String,
>>>> i: String,
>>>> j: String,
>>>> k: String,
>>>> l: String,
>>>> m: String,
>>>> n: String,
>>>> p: String,
>>>> q: String,
>>>> r: String,
>>>> s: String,
>>>> t: String,
>>>> v: String,
>>>> w: String)
>>>>
>>>>
>>>> def main(args: Array[String]) {
>>>>
>>>> val params = ParameterTool.fromArgs(args)
>>>> val input = params.getRequired("input")
>>>>
>>>>
>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>> env.setParallelism(1)
>>>> val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>
>>>> val dataStream = env.readTextFile(input)
>>>>
>>>> val namedStream = dataStream.map((value:String) => {
>>>>
>>>> val columns = value.split(",")
>>>> Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
>>>> columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
>>>> columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
>>>> columns(18),columns(19), columns(20)
>>>> )
>>>> })
>>>>
>>>>
>>>> val cleanedStream = namedStream.filter(_.j == " ").filter(_.k==" ")
>>>>
>>>> val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
>>>> override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
>>>> })
>>>>
>>>>
>>>>
>>>> tableEnv.registerDataStream("CDRS", watermarkedStream)
>>>> val results = tableEnv.sqlQuery( """
>>>> |SELECT
>>>> | a
>>>> | FROM CDRS
>>>> """.stripMargin)
>>>>
>>>>
>>>> val result: Table = results
>>>>
>>>> val path = "file:///Users/test/1.txt"
>>>> val sink :TableSink[Row]= new CsvTableSink(
>>>> path, // output path
>>>> fieldDelim = "|", // optional: delimit files by '|'
>>>> numFiles = 1, // optional: write to a single file
>>>> writeMode = WriteMode.OVERWRITE)
>>>>
>>>> result.writeToSink(sink)
>>>>
>>>>
>>>> env.execute("this job")
>>>>
>>>> }
>>>> }
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <ni...@data-artisans.com>
>>>> wrote:
>>>>
>>>>> Hi Karim,
>>>>> when I was trying to reproduce your code, I got an exception with the
>>>>> name 'table' being used - by replacing it and completing the job with
>>>>> some input, I did see the csv file popping up. Also, the job was
>>>>> crashing when the file 1.txt already existed.
>>>>>
>>>>> The code I used (running Flink 1.5-SNAPSHOT):
>>>>>
>>>>> def main(args: Array[String]) {
>>>>> // set up the streaming execution environment
>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>> val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>>
>>>>> val stream: DataStream[(Int, Long, String)] =
>>>>> get3TupleDataStream(env)
>>>>> .assignAscendingTimestamps(_._2)
>>>>> tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
>>>>>
>>>>> val results = tableEnv.sqlQuery( """
>>>>> |SELECT
>>>>> | A,C
>>>>> | FROM mytable
>>>>> """.stripMargin)
>>>>>
>>>>> val result: Table = results
>>>>>
>>>>> val path = "file:///tmp/test/1.txt"
>>>>> val sink :TableSink[Row]= new CsvTableSink(
>>>>> path, // output path
>>>>> fieldDelim = "|", // optional: delimit files by
>>>>> '|'
>>>>> numFiles = 1, // optional: write to a single
>>>>> file
>>>>> writeMode = WriteMode.NO_OVERWRITE)
>>>>>
>>>>> result.writeToSink(sink)
>>>>>
>>>>> env.execute("this job")
>>>>> }
>>>>>
>>>>> def get3TupleDataStream(env: StreamExecutionEnvironment):
>>>>> DataStream[(Int, Long, String)] = {
>>>>> val data = new mutable.MutableList[(Int, Long, String)]
>>>>> data.+=((1, 1L, "Hi"))
>>>>> data.+=((2, 2L, "Hello"))
>>>>> data.+=((3, 2L, "Hello world"))
>>>>> data.+=((4, 3L, "Hello world, how are you?"))
>>>>> data.+=((5, 3L, "I am fine."))
>>>>> data.+=((6, 3L, "Luke Skywalker"))
>>>>> env.fromCollection(data)
>>>>> }
>>>>>
>>>>>
>>>>> Nico
>>>>>
>>>>> On 16/03/18 22:50, karim amer wrote:
>>>>> > Hi There,
>>>>> >
>>>>> > I am trying to write a CSVsink to disk but it's not getting
>>>>> written. I
>>>>> > think the file is getting overwritten or truncated once The Stream
>>>>> > process finishes. Does anyone know why the file is getting
>>>>> overwritten
>>>>> > or truncated and how can i fix this ?
>>>>> >
>>>>> >
>>>>> > tableEnv.registerDataStream("table", watermarkedStream)
>>>>> >
>>>>> > val results = tableEnv.sqlQuery( """
>>>>> > |SELECT
>>>>> > | A
>>>>> > | FROM table
>>>>> > """.stripMargin)
>>>>> >
>>>>> >
>>>>> >
>>>>> > val result: Table = results
>>>>> >
>>>>> > val path = "file:///path/test/1.txt"
>>>>> > val sink :TableSink[Row]= new CsvTableSink(
>>>>> > path, // output path
>>>>> > fieldDelim = "|", // optional: delimit files by '|'
>>>>> > numFiles = 1, // optional: write to a single file
>>>>> > writeMode = WriteMode.NO_OVERWRITE)
>>>>> >
>>>>> > result.writeToSink(sink)
>>>>> >
>>>>> > env.execute("this job")
>>>>> >
>>>>> >
>>>>> >
>>>>> >
>>>>> > Thanks
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> karim amer
>>>>
>>>>
>>>
>>
>>
>> --
>> karim amer
>>
>>
>
>
> --
> karim amer
>
>
--
karim amer
Re: CsvSink
Posted by karim amer <ka...@gmail.com>.
After switching to Maven from Sbt I got these errors
Error:(63, 37) could not find implicit value for evidence parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.quickstart.DataStreamtotableapi.Calls]
val namedStream = dataStream.map((value:String) => {
Error:(63, 37) not enough arguments for method map: (implicit evidence$7:
org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.quickstart.DataStreamtotableapi.Calls])org.apache.flink.streaming.api.scala.DataStream[org.apache.flink.quickstart.DataStreamtotableapi.Calls].
Unspecified value parameter evidence$7.
val namedStream = dataStream.map((value:String) => {
Should i file a bug report ?
On Tue, Mar 20, 2018 at 9:30 AM, karim amer <ka...@gmail.com> wrote:
> Hi Fabian
> Sorry if i confused you The first error is from Nico's code Not my code
> or snippet
> I am still having the original problem in my snippet where it's writing a
> blank csv file even though i get
> [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
> After running the job
>
> Cheers,
> karim
>
> On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Karim,
>>
>> I cannot find a method invocation "tableEnv.registerDataStream("myTable2",
>> set, 'A, 'B, 'C )" as shown in the error message in your example.
>> It would help if you would keep error message and code consistent.
>> Otherwise it's not possible to figure out what's going on.
>>
>> Best, Fabian
>>
>> 2018-03-20 0:24 GMT+01:00 karim amer <ka...@gmail.com>:
>>
>>> Hi Nico,
>>>
>>> I tried to reproduce your code but registerDataStream keeps failing to
>>> register the fields even though i am following your code and the Docs.
>>> here is the error
>>> [error] found : Symbol
>>> [error] required: org.apache.flink.table.expressions.Expression
>>> [error] tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
>>> [error]
>>> I think my code snippet was misleading. Here is the full snippet
>>> Changing the name from table didn't fix it for
>>>
>>> import org.apache.flink.streaming.api.scala._
>>> import org.apache.flink.api.java.utils.ParameterTool
>>> import org.apache.flink.core.fs.FileSystem.WriteMode
>>> import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
>>> import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>> import org.apache.flink.streaming.api.windowing.time.Time
>>> import org.apache.flink.table.api.{Table, TableEnvironment}
>>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
>>> import org.apache.flink.types.Row
>>>
>>>
>>>
>>> object datastreamtotableapi {
>>>
>>> case class Calls(a: String,
>>> b: String,
>>> c: String,
>>> d: String,
>>> e: String,
>>> f: String,
>>> g: String,
>>> h: String,
>>> i: String,
>>> j: String,
>>> k: String,
>>> l: String,
>>> m: String,
>>> n: String,
>>> p: String,
>>> q: String,
>>> r: String,
>>> s: String,
>>> t: String,
>>> v: String,
>>> w: String)
>>>
>>>
>>> def main(args: Array[String]) {
>>>
>>> val params = ParameterTool.fromArgs(args)
>>> val input = params.getRequired("input")
>>>
>>>
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>> env.setParallelism(1)
>>> val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>
>>> val dataStream = env.readTextFile(input)
>>>
>>> val namedStream = dataStream.map((value:String) => {
>>>
>>> val columns = value.split(",")
>>> Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
>>> columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
>>> columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
>>> columns(18),columns(19), columns(20)
>>> )
>>> })
>>>
>>>
>>> val cleanedStream = namedStream.filter(_.j == " ").filter(_.k==" ")
>>>
>>> val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
>>> override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
>>> })
>>>
>>>
>>>
>>> tableEnv.registerDataStream("CDRS", watermarkedStream)
>>> val results = tableEnv.sqlQuery( """
>>> |SELECT
>>> | a
>>> | FROM CDRS
>>> """.stripMargin)
>>>
>>>
>>> val result: Table = results
>>>
>>> val path = "file:///Users/test/1.txt"
>>> val sink :TableSink[Row]= new CsvTableSink(
>>> path, // output path
>>> fieldDelim = "|", // optional: delimit files by '|'
>>> numFiles = 1, // optional: write to a single file
>>> writeMode = WriteMode.OVERWRITE)
>>>
>>> result.writeToSink(sink)
>>>
>>>
>>> env.execute("this job")
>>>
>>> }
>>> }
>>>
>>>
>>>
>>>
>>> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <ni...@data-artisans.com>
>>> wrote:
>>>
>>>> Hi Karim,
>>>> when I was trying to reproduce your code, I got an exception with the
>>>> name 'table' being used - by replacing it and completing the job with
>>>> some input, I did see the csv file popping up. Also, the job was
>>>> crashing when the file 1.txt already existed.
>>>>
>>>> The code I used (running Flink 1.5-SNAPSHOT):
>>>>
>>>> def main(args: Array[String]) {
>>>> // set up the streaming execution environment
>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>> val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>
>>>> val stream: DataStream[(Int, Long, String)] =
>>>> get3TupleDataStream(env)
>>>> .assignAscendingTimestamps(_._2)
>>>> tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
>>>>
>>>> val results = tableEnv.sqlQuery( """
>>>> |SELECT
>>>> | A,C
>>>> | FROM mytable
>>>> """.stripMargin)
>>>>
>>>> val result: Table = results
>>>>
>>>> val path = "file:///tmp/test/1.txt"
>>>> val sink :TableSink[Row]= new CsvTableSink(
>>>> path, // output path
>>>> fieldDelim = "|", // optional: delimit files by
>>>> '|'
>>>> numFiles = 1, // optional: write to a single
>>>> file
>>>> writeMode = WriteMode.NO_OVERWRITE)
>>>>
>>>> result.writeToSink(sink)
>>>>
>>>> env.execute("this job")
>>>> }
>>>>
>>>> def get3TupleDataStream(env: StreamExecutionEnvironment):
>>>> DataStream[(Int, Long, String)] = {
>>>> val data = new mutable.MutableList[(Int, Long, String)]
>>>> data.+=((1, 1L, "Hi"))
>>>> data.+=((2, 2L, "Hello"))
>>>> data.+=((3, 2L, "Hello world"))
>>>> data.+=((4, 3L, "Hello world, how are you?"))
>>>> data.+=((5, 3L, "I am fine."))
>>>> data.+=((6, 3L, "Luke Skywalker"))
>>>> env.fromCollection(data)
>>>> }
>>>>
>>>>
>>>> Nico
>>>>
>>>> On 16/03/18 22:50, karim amer wrote:
>>>> > Hi There,
>>>> >
>>>> > I am trying to write a CSVsink to disk but it's not getting written.
>>>> I
>>>> > think the file is getting overwritten or truncated once The Stream
>>>> > process finishes. Does anyone know why the file is getting overwritten
>>>> > or truncated and how can i fix this ?
>>>> >
>>>> >
>>>> > tableEnv.registerDataStream("table", watermarkedStream)
>>>> >
>>>> > val results = tableEnv.sqlQuery( """
>>>> > |SELECT
>>>> > | A
>>>> > | FROM table
>>>> > """.stripMargin)
>>>> >
>>>> >
>>>> >
>>>> > val result: Table = results
>>>> >
>>>> > val path = "file:///path/test/1.txt"
>>>> > val sink :TableSink[Row]= new CsvTableSink(
>>>> > path, // output path
>>>> > fieldDelim = "|", // optional: delimit files by '|'
>>>> > numFiles = 1, // optional: write to a single file
>>>> > writeMode = WriteMode.NO_OVERWRITE)
>>>> >
>>>> > result.writeToSink(sink)
>>>> >
>>>> > env.execute("this job")
>>>> >
>>>> >
>>>> >
>>>> >
>>>> > Thanks
>>>>
>>>>
>>>
>>>
>>> --
>>> karim amer
>>>
>>>
>>
>
>
> --
> karim amer
>
>
--
karim amer
Re: CsvSink
Posted by karim amer <ka...@gmail.com>.
Hi Fabian
Sorry if i confused you The first error is from Nico's code Not my code or
snippet
I am still having the original problem in my snippet where it's writing a
blank csv file even though i get
[success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
After running the job
Cheers,
karim
On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <fh...@gmail.com> wrote:
> Hi Karim,
>
> I cannot find a method invocation "tableEnv.registerDataStream("myTable2",
> set, 'A, 'B, 'C )" as shown in the error message in your example.
> It would help if you would keep error message and code consistent.
> Otherwise it's not possible to figure out what's going on.
>
> Best, Fabian
>
> 2018-03-20 0:24 GMT+01:00 karim amer <ka...@gmail.com>:
>
>> Hi Nico,
>>
>> I tried to reproduce your code but registerDataStream keeps failing to
>> register the fields even though i am following your code and the Docs.
>> here is the error
>> [error] found : Symbol
>> [error] required: org.apache.flink.table.expressions.Expression
>> [error] tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
>> [error]
>> I think my code snippet was misleading. Here is the full snippet Changing
>> the name from table didn't fix it for
>>
>> import org.apache.flink.streaming.api.scala._
>> import org.apache.flink.api.java.utils.ParameterTool
>> import org.apache.flink.core.fs.FileSystem.WriteMode
>> import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
>> import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>> import org.apache.flink.streaming.api.windowing.time.Time
>> import org.apache.flink.table.api.{Table, TableEnvironment}
>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
>> import org.apache.flink.types.Row
>>
>>
>>
>> object datastreamtotableapi {
>>
>> case class Calls(a: String,
>> b: String,
>> c: String,
>> d: String,
>> e: String,
>> f: String,
>> g: String,
>> h: String,
>> i: String,
>> j: String,
>> k: String,
>> l: String,
>> m: String,
>> n: String,
>> p: String,
>> q: String,
>> r: String,
>> s: String,
>> t: String,
>> v: String,
>> w: String)
>>
>>
>> def main(args: Array[String]) {
>>
>> val params = ParameterTool.fromArgs(args)
>> val input = params.getRequired("input")
>>
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> env.setParallelism(1)
>> val tableEnv = TableEnvironment.getTableEnvironment(env)
>>
>> val dataStream = env.readTextFile(input)
>>
>> val namedStream = dataStream.map((value:String) => {
>>
>> val columns = value.split(",")
>> Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
>> columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
>> columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
>> columns(18),columns(19), columns(20)
>> )
>> })
>>
>>
>> val cleanedStream = namedStream.filter(_.j == " ").filter(_.k==" ")
>>
>> val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
>> override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
>> })
>>
>>
>>
>> tableEnv.registerDataStream("CDRS", watermarkedStream)
>> val results = tableEnv.sqlQuery( """
>> |SELECT
>> | a
>> | FROM CDRS
>> """.stripMargin)
>>
>>
>> val result: Table = results
>>
>> val path = "file:///Users/test/1.txt"
>> val sink :TableSink[Row]= new CsvTableSink(
>> path, // output path
>> fieldDelim = "|", // optional: delimit files by '|'
>> numFiles = 1, // optional: write to a single file
>> writeMode = WriteMode.OVERWRITE)
>>
>> result.writeToSink(sink)
>>
>>
>> env.execute("this job")
>>
>> }
>> }
>>
>>
>>
>>
>> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <ni...@data-artisans.com>
>> wrote:
>>
>>> Hi Karim,
>>> when I was trying to reproduce your code, I got an exception with the
>>> name 'table' being used - by replacing it and completing the job with
>>> some input, I did see the csv file popping up. Also, the job was
>>> crashing when the file 1.txt already existed.
>>>
>>> The code I used (running Flink 1.5-SNAPSHOT):
>>>
>>> def main(args: Array[String]) {
>>> // set up the streaming execution environment
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>
>>> val stream: DataStream[(Int, Long, String)] =
>>> get3TupleDataStream(env)
>>> .assignAscendingTimestamps(_._2)
>>> tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
>>>
>>> val results = tableEnv.sqlQuery( """
>>> |SELECT
>>> | A,C
>>> | FROM mytable
>>> """.stripMargin)
>>>
>>> val result: Table = results
>>>
>>> val path = "file:///tmp/test/1.txt"
>>> val sink :TableSink[Row]= new CsvTableSink(
>>> path, // output path
>>> fieldDelim = "|", // optional: delimit files by '|'
>>> numFiles = 1, // optional: write to a single
>>> file
>>> writeMode = WriteMode.NO_OVERWRITE)
>>>
>>> result.writeToSink(sink)
>>>
>>> env.execute("this job")
>>> }
>>>
>>> def get3TupleDataStream(env: StreamExecutionEnvironment):
>>> DataStream[(Int, Long, String)] = {
>>> val data = new mutable.MutableList[(Int, Long, String)]
>>> data.+=((1, 1L, "Hi"))
>>> data.+=((2, 2L, "Hello"))
>>> data.+=((3, 2L, "Hello world"))
>>> data.+=((4, 3L, "Hello world, how are you?"))
>>> data.+=((5, 3L, "I am fine."))
>>> data.+=((6, 3L, "Luke Skywalker"))
>>> env.fromCollection(data)
>>> }
>>>
>>>
>>> Nico
>>>
>>> On 16/03/18 22:50, karim amer wrote:
>>> > Hi There,
>>> >
>>> > I am trying to write a CSVsink to disk but it's not getting written. I
>>> > think the file is getting overwritten or truncated once The Stream
>>> > process finishes. Does anyone know why the file is getting overwritten
>>> > or truncated and how can i fix this ?
>>> >
>>> >
>>> > tableEnv.registerDataStream("table", watermarkedStream)
>>> >
>>> > val results = tableEnv.sqlQuery( """
>>> > |SELECT
>>> > | A
>>> > | FROM table
>>> > """.stripMargin)
>>> >
>>> >
>>> >
>>> > val result: Table = results
>>> >
>>> > val path = "file:///path/test/1.txt"
>>> > val sink :TableSink[Row]= new CsvTableSink(
>>> > path, // output path
>>> > fieldDelim = "|", // optional: delimit files by '|'
>>> > numFiles = 1, // optional: write to a single file
>>> > writeMode = WriteMode.NO_OVERWRITE)
>>> >
>>> > result.writeToSink(sink)
>>> >
>>> > env.execute("this job")
>>> >
>>> >
>>> >
>>> >
>>> > Thanks
>>>
>>>
>>
>>
>> --
>> karim amer
>>
>>
>
--
karim amer
Re: CsvSink
Posted by Fabian Hueske <fh...@gmail.com>.
Hi Karim,
I cannot find a method invocation "tableEnv.registerDataStream("myTable2",
set, 'A, 'B, 'C )" as shown in the error message in your example.
It would help if you would keep error message and code consistent.
Otherwise it's not possible to figure out what's going on.
Best, Fabian
2018-03-20 0:24 GMT+01:00 karim amer <ka...@gmail.com>:
> Hi Nico,
>
> I tried to reproduce your code but registerDataStream keeps failing to
> register the fields even though i am following your code and the Docs.
> here is the error
> [error] found : Symbol
> [error] required: org.apache.flink.table.expressions.Expression
> [error] tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
> [error]
> I think my code snippet was misleading. Here is the full snippet Changing
> the name from table didn't fix it for
>
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.flink.core.fs.FileSystem.WriteMode
> import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
> import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.table.api.{Table, TableEnvironment}
> import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
> import org.apache.flink.types.Row
>
>
>
> object datastreamtotableapi {
>
> case class Calls(a: String,
> b: String,
> c: String,
> d: String,
> e: String,
> f: String,
> g: String,
> h: String,
> i: String,
> j: String,
> k: String,
> l: String,
> m: String,
> n: String,
> p: String,
> q: String,
> r: String,
> s: String,
> t: String,
> v: String,
> w: String)
>
>
> def main(args: Array[String]) {
>
> val params = ParameterTool.fromArgs(args)
> val input = params.getRequired("input")
>
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.setParallelism(1)
> val tableEnv = TableEnvironment.getTableEnvironment(env)
>
> val dataStream = env.readTextFile(input)
>
> val namedStream = dataStream.map((value:String) => {
>
> val columns = value.split(",")
> Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
> columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
> columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
> columns(18),columns(19), columns(20)
> )
> })
>
>
> val cleanedStream = namedStream.filter(_.j == " ").filter(_.k==" ")
>
> val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
> override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
> })
>
>
>
> tableEnv.registerDataStream("CDRS", watermarkedStream)
> val results = tableEnv.sqlQuery( """
> |SELECT
> | a
> | FROM CDRS
> """.stripMargin)
>
>
> val result: Table = results
>
> val path = "file:///Users/test/1.txt"
> val sink :TableSink[Row]= new CsvTableSink(
> path, // output path
> fieldDelim = "|", // optional: delimit files by '|'
> numFiles = 1, // optional: write to a single file
> writeMode = WriteMode.OVERWRITE)
>
> result.writeToSink(sink)
>
>
> env.execute("this job")
>
> }
> }
>
>
>
>
> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <ni...@data-artisans.com>
> wrote:
>
>> Hi Karim,
>> when I was trying to reproduce your code, I got an exception with the
>> name 'table' being used - by replacing it and completing the job with
>> some input, I did see the csv file popping up. Also, the job was
>> crashing when the file 1.txt already existed.
>>
>> The code I used (running Flink 1.5-SNAPSHOT):
>>
>> def main(args: Array[String]) {
>> // set up the streaming execution environment
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> val tableEnv = TableEnvironment.getTableEnvironment(env)
>>
>> val stream: DataStream[(Int, Long, String)] = get3TupleDataStream(env)
>> .assignAscendingTimestamps(_._2)
>> tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
>>
>> val results = tableEnv.sqlQuery( """
>> |SELECT
>> | A,C
>> | FROM mytable
>> """.stripMargin)
>>
>> val result: Table = results
>>
>> val path = "file:///tmp/test/1.txt"
>> val sink :TableSink[Row]= new CsvTableSink(
>> path, // output path
>> fieldDelim = "|", // optional: delimit files by '|'
>> numFiles = 1, // optional: write to a single
>> file
>> writeMode = WriteMode.NO_OVERWRITE)
>>
>> result.writeToSink(sink)
>>
>> env.execute("this job")
>> }
>>
>> def get3TupleDataStream(env: StreamExecutionEnvironment):
>> DataStream[(Int, Long, String)] = {
>> val data = new mutable.MutableList[(Int, Long, String)]
>> data.+=((1, 1L, "Hi"))
>> data.+=((2, 2L, "Hello"))
>> data.+=((3, 2L, "Hello world"))
>> data.+=((4, 3L, "Hello world, how are you?"))
>> data.+=((5, 3L, "I am fine."))
>> data.+=((6, 3L, "Luke Skywalker"))
>> env.fromCollection(data)
>> }
>>
>>
>> Nico
>>
>> On 16/03/18 22:50, karim amer wrote:
>> > Hi There,
>> >
>> > I am trying to write a CSVsink to disk but it's not getting written. I
>> > think the file is getting overwritten or truncated once The Stream
>> > process finishes. Does anyone know why the file is getting overwritten
>> > or truncated and how can i fix this ?
>> >
>> >
>> > tableEnv.registerDataStream("table", watermarkedStream)
>> >
>> > val results = tableEnv.sqlQuery( """
>> > |SELECT
>> > | A
>> > | FROM table
>> > """.stripMargin)
>> >
>> >
>> >
>> > val result: Table = results
>> >
>> > val path = "file:///path/test/1.txt"
>> > val sink :TableSink[Row]= new CsvTableSink(
>> > path, // output path
>> > fieldDelim = "|", // optional: delimit files by '|'
>> > numFiles = 1, // optional: write to a single file
>> > writeMode = WriteMode.NO_OVERWRITE)
>> >
>> > result.writeToSink(sink)
>> >
>> > env.execute("this job")
>> >
>> >
>> >
>> >
>> > Thanks
>>
>>
>
>
> --
> karim amer
>
>
Re: CsvSink
Posted by karim amer <ka...@gmail.com>.
Hi Nico,
I tried to reproduce your code but registerDataStream keeps failing to
register the fields even though i am following your code and the Docs.
here is the error
[error] found : Symbol
[error] required: org.apache.flink.table.expressions.Expression
[error] tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
[error]
I think my code snippet was misleading. Here is the full snippet Changing
the name from table didn't fix it for
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
import org.apache.flink.types.Row
object datastreamtotableapi {
case class Calls(a: String,
b: String,
c: String,
d: String,
e: String,
f: String,
g: String,
h: String,
i: String,
j: String,
k: String,
l: String,
m: String,
n: String,
p: String,
q: String,
r: String,
s: String,
t: String,
v: String,
w: String)
def main(args: Array[String]) {
val params = ParameterTool.fromArgs(args)
val input = params.getRequired("input")
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val tableEnv = TableEnvironment.getTableEnvironment(env)
val dataStream = env.readTextFile(input)
val namedStream = dataStream.map((value:String) => {
val columns = value.split(",")
Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
columns(12), columns(13),columns(14),columns(15), columns(16),
columns(17),
columns(18),columns(19), columns(20)
)
})
val cleanedStream = namedStream.filter(_.j == " ").filter(_.k==" ")
val watermarkedStream =
cleanedStream.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
override def extractTimestamp(element: Calls): Long =
(element.j.concat(element.k)).toLong
})
tableEnv.registerDataStream("CDRS", watermarkedStream)
val results = tableEnv.sqlQuery( """
|SELECT
| a
| FROM CDRS
""".stripMargin)
val result: Table = results
val path = "file:///Users/test/1.txt"
val sink :TableSink[Row]= new CsvTableSink(
path, // output path
fieldDelim = "|", // optional: delimit files by '|'
numFiles = 1, // optional: write to a single file
writeMode = WriteMode.OVERWRITE)
result.writeToSink(sink)
env.execute("this job")
}
}
On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <ni...@data-artisans.com> wrote:
> Hi Karim,
> when I was trying to reproduce your code, I got an exception with the
> name 'table' being used - by replacing it and completing the job with
> some input, I did see the csv file popping up. Also, the job was
> crashing when the file 1.txt already existed.
>
> The code I used (running Flink 1.5-SNAPSHOT):
>
> def main(args: Array[String]) {
> // set up the streaming execution environment
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = TableEnvironment.getTableEnvironment(env)
>
> val stream: DataStream[(Int, Long, String)] = get3TupleDataStream(env)
> .assignAscendingTimestamps(_._2)
> tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
>
> val results = tableEnv.sqlQuery( """
> |SELECT
> | A,C
> | FROM mytable
> """.stripMargin)
>
> val result: Table = results
>
> val path = "file:///tmp/test/1.txt"
> val sink :TableSink[Row]= new CsvTableSink(
> path, // output path
> fieldDelim = "|", // optional: delimit files by '|'
> numFiles = 1, // optional: write to a single file
> writeMode = WriteMode.NO_OVERWRITE)
>
> result.writeToSink(sink)
>
> env.execute("this job")
> }
>
> def get3TupleDataStream(env: StreamExecutionEnvironment):
> DataStream[(Int, Long, String)] = {
> val data = new mutable.MutableList[(Int, Long, String)]
> data.+=((1, 1L, "Hi"))
> data.+=((2, 2L, "Hello"))
> data.+=((3, 2L, "Hello world"))
> data.+=((4, 3L, "Hello world, how are you?"))
> data.+=((5, 3L, "I am fine."))
> data.+=((6, 3L, "Luke Skywalker"))
> env.fromCollection(data)
> }
>
>
> Nico
>
> On 16/03/18 22:50, karim amer wrote:
> > Hi There,
> >
> > I am trying to write a CSVsink to disk but it's not getting written. I
> > think the file is getting overwritten or truncated once The Stream
> > process finishes. Does anyone know why the file is getting overwritten
> > or truncated and how can i fix this ?
> >
> >
> > tableEnv.registerDataStream("table", watermarkedStream)
> >
> > val results = tableEnv.sqlQuery( """
> > |SELECT
> > | A
> > | FROM table
> > """.stripMargin)
> >
> >
> >
> > val result: Table = results
> >
> > val path = "file:///path/test/1.txt"
> > val sink :TableSink[Row]= new CsvTableSink(
> > path, // output path
> > fieldDelim = "|", // optional: delimit files by '|'
> > numFiles = 1, // optional: write to a single file
> > writeMode = WriteMode.NO_OVERWRITE)
> >
> > result.writeToSink(sink)
> >
> > env.execute("this job")
> >
> >
> >
> >
> > Thanks
>
>
--
karim amer
Re: CsvSink
Posted by Nico Kruber <ni...@data-artisans.com>.
Hi Karim,
when I was trying to reproduce your code, I got an exception with the
name 'table' being used - by replacing it and completing the job with
some input, I did see the csv file popping up. Also, the job was
crashing when the file 1.txt already existed.
The code I used (running Flink 1.5-SNAPSHOT):
def main(args: Array[String]) {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[(Int, Long, String)] = get3TupleDataStream(env)
.assignAscendingTimestamps(_._2)
tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
val results = tableEnv.sqlQuery( """
|SELECT
| A,C
| FROM mytable
""".stripMargin)
val result: Table = results
val path = "file:///tmp/test/1.txt"
val sink :TableSink[Row]= new CsvTableSink(
path, // output path
fieldDelim = "|", // optional: delimit files by '|'
numFiles = 1, // optional: write to a single file
writeMode = WriteMode.NO_OVERWRITE)
result.writeToSink(sink)
env.execute("this job")
}
def get3TupleDataStream(env: StreamExecutionEnvironment):
DataStream[(Int, Long, String)] = {
val data = new mutable.MutableList[(Int, Long, String)]
data.+=((1, 1L, "Hi"))
data.+=((2, 2L, "Hello"))
data.+=((3, 2L, "Hello world"))
data.+=((4, 3L, "Hello world, how are you?"))
data.+=((5, 3L, "I am fine."))
data.+=((6, 3L, "Luke Skywalker"))
env.fromCollection(data)
}
Nico
On 16/03/18 22:50, karim amer wrote:
> Hi There,
>
> I am trying to write a CSVsink to disk but it's not getting written. I
> think the file is getting overwritten or truncated once The Stream
> process finishes. Does anyone know why the file is getting overwritten
> or truncated and how can i fix this ?
>
>
> tableEnv.registerDataStream("table", watermarkedStream)
>
> val results = tableEnv.sqlQuery( """
> |SELECT
> | A
> | FROM table
> """.stripMargin)
>
>
>
> val result: Table = results
>
> val path = "file:///path/test/1.txt"
> val sink :TableSink[Row]= new CsvTableSink(
> path, // output path
> fieldDelim = "|", // optional: delimit files by '|'
> numFiles = 1, // optional: write to a single file
> writeMode = WriteMode.NO_OVERWRITE)
>
> result.writeToSink(sink)
>
> env.execute("this job")
>
>
>
>
> Thanks