You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by karim amer <ka...@gmail.com> on 2018/03/16 21:50:14 UTC

CsvSink

Hi There,

 I am trying to write a CSVsink to disk but it's not getting written. I
think the file is getting overwritten or truncated once The Stream process
finishes. Does anyone know why the file is getting overwritten or truncated
and how can i fix this ?


tableEnv.registerDataStream("table", watermarkedStream)

val results = tableEnv.sqlQuery( """
                                  |SELECT
                                  | A
                                  | FROM table
                                   """.stripMargin)



val result: Table = results

val path = "file:///path/test/1.txt"
val sink :TableSink[Row]=   new CsvTableSink(
  path,                             // output path
  fieldDelim = "|",                 // optional: delimit files by '|'
  numFiles = 1,                     // optional: write to a single file
  writeMode = WriteMode.NO_OVERWRITE)

result.writeToSink(sink)

env.execute("this job")




Thanks

Re: CsvSink

Posted by Fabian Hueske <fh...@gmail.com>.
Great, thanks for reporting back!

Best, Fabian

2018-03-20 22:40 GMT+01:00 karim amer <ka...@gmail.com>:

> Never mind I found the error and has nothing to do with flink.
> Sorry
>
> On Tue, Mar 20, 2018 at 12:12 PM, karim amer <ka...@gmail.com>
> wrote:
>
>> here is the output after fixing the scala issues
>>
>> https://gist.github.com/karimamer/9e3bcf0a6d9110c01caa2ebd14aa7a8c
>>
>> On Tue, Mar 20, 2018 at 11:39 AM, karim amer <ka...@gmail.com>
>> wrote:
>>
>>> Never mind after importing
>>>
>>> import org.apache.flink.api.scala._
>>>
>>> theses errors went away and i still have the original problem.
>>> Sorry my bad
>>>
>>> On Tue, Mar 20, 2018 at 11:04 AM, karim amer <ka...@gmail.com>
>>> wrote:
>>>
>>>> To clarify should i file a bug report on sbt hiding the errors in the
>>>> previous email ?
>>>>
>>>> On Tue, Mar 20, 2018 at 9:44 AM, karim amer <ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> After switching to Maven from Sbt I got these errors
>>>>> Error:(63, 37) could not find implicit value for evidence parameter of
>>>>> type org.apache.flink.api.common.typeinfo.TypeInformation[org.apa
>>>>> che.flink.quickstart.DataStreamtotableapi.Calls]
>>>>>     val namedStream = dataStream.map((value:String) => {
>>>>>
>>>>>
>>>>> Error:(63, 37) not enough arguments for method map: (implicit
>>>>> evidence$7: org.apache.flink.api.common.ty
>>>>> peinfo.TypeInformation[org.apache.flink.quickstart.DataStrea
>>>>> mtotableapi.Calls])org.apache.flink.streaming.api.scala.Data
>>>>> Stream[org.apache.flink.quickstart.DataStreamtotableapi.Calls].
>>>>> Unspecified value parameter evidence$7.
>>>>>     val namedStream = dataStream.map((value:String) => {
>>>>>
>>>>>
>>>>> Should i file a bug report  ?
>>>>>
>>>>> On Tue, Mar 20, 2018 at 9:30 AM, karim amer <ka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Fabian
>>>>>> Sorry if i confused you The first error is from Nico's code Not my
>>>>>> code or  snippet
>>>>>> I am still having the original problem in my snippet where it's
>>>>>> writing a blank csv file even though i get
>>>>>> [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
>>>>>> After running the job
>>>>>>
>>>>>> Cheers,
>>>>>> karim
>>>>>>
>>>>>> On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <fh...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Karim,
>>>>>>>
>>>>>>> I cannot find a method invocation "tableEnv.registerDataStream("myTable2",
>>>>>>> set, 'A, 'B, 'C )" as shown in the error message in your example.
>>>>>>> It would help if you would keep error message and code consistent.
>>>>>>> Otherwise it's not possible to figure out what's going on.
>>>>>>>
>>>>>>> Best, Fabian
>>>>>>>
>>>>>>> 2018-03-20 0:24 GMT+01:00 karim amer <ka...@gmail.com>:
>>>>>>>
>>>>>>>> Hi Nico,
>>>>>>>>
>>>>>>>> I tried to reproduce your code but registerDataStream keeps
>>>>>>>> failing to register the fields even though i am following your code and the
>>>>>>>> Docs.
>>>>>>>> here is the error
>>>>>>>> [error]  found   : Symbol
>>>>>>>> [error]  required: org.apache.flink.table.expressions.Expression
>>>>>>>> [error]     tableEnv.registerDataStream("myTable2", set, 'A, 'B,
>>>>>>>> 'C )
>>>>>>>> [error]
>>>>>>>> I think my code snippet was misleading. Here is the full snippet
>>>>>>>> Changing the name from table didn't fix it for
>>>>>>>>
>>>>>>>> import org.apache.flink.streaming.api.scala._
>>>>>>>> import org.apache.flink.api.java.utils.ParameterTool
>>>>>>>> import org.apache.flink.core.fs.FileSystem.WriteMode
>>>>>>>> import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
>>>>>>>> import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
>>>>>>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>>>>>>> import org.apache.flink.streaming.api.windowing.time.Time
>>>>>>>> import org.apache.flink.table.api.{Table, TableEnvironment}
>>>>>>>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
>>>>>>>> import org.apache.flink.types.Row
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> object datastreamtotableapi {
>>>>>>>>
>>>>>>>>   case class Calls(a: String,
>>>>>>>>                    b: String,
>>>>>>>>                    c: String,
>>>>>>>>                    d: String,
>>>>>>>>                    e: String,
>>>>>>>>                    f: String,
>>>>>>>>                    g: String,
>>>>>>>>                    h: String,
>>>>>>>>                    i: String,
>>>>>>>>                    j: String,
>>>>>>>>                    k: String,
>>>>>>>>                    l: String,
>>>>>>>>                    m: String,
>>>>>>>>                    n: String,
>>>>>>>>                    p: String,
>>>>>>>>                    q: String,
>>>>>>>>                    r: String,
>>>>>>>>                    s: String,
>>>>>>>>                    t: String,
>>>>>>>>                    v: String,
>>>>>>>>                    w: String)
>>>>>>>>
>>>>>>>>
>>>>>>>>   def main(args: Array[String]) {
>>>>>>>>
>>>>>>>>     val params = ParameterTool.fromArgs(args)
>>>>>>>>     val input = params.getRequired("input")
>>>>>>>>
>>>>>>>>
>>>>>>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>>>>>     env.setParallelism(1)
>>>>>>>>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>>>>>
>>>>>>>>     val dataStream = env.readTextFile(input)
>>>>>>>>
>>>>>>>>     val namedStream = dataStream.map((value:String) => {
>>>>>>>>
>>>>>>>>       val columns = value.split(",")
>>>>>>>>       Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
>>>>>>>>         columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
>>>>>>>>         columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
>>>>>>>>         columns(18),columns(19), columns(20)
>>>>>>>>       )
>>>>>>>>     })
>>>>>>>>
>>>>>>>>
>>>>>>>>    val cleanedStream =  namedStream.filter(_.j == " ").filter(_.k==" ")
>>>>>>>>
>>>>>>>>    val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
>>>>>>>>      override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
>>>>>>>>    })
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>     tableEnv.registerDataStream("CDRS", watermarkedStream)
>>>>>>>>     val results = tableEnv.sqlQuery( """
>>>>>>>>                                       |SELECT
>>>>>>>>                                       | a
>>>>>>>>                                       | FROM CDRS
>>>>>>>>                                        """.stripMargin)
>>>>>>>>
>>>>>>>>
>>>>>>>>     val result: Table = results
>>>>>>>>
>>>>>>>>     val path = "file:///Users/test/1.txt"
>>>>>>>>     val sink :TableSink[Row]=   new CsvTableSink(
>>>>>>>>       path,                             // output path
>>>>>>>>       fieldDelim = "|",                 // optional: delimit files by '|'
>>>>>>>>       numFiles = 1,                     // optional: write to a single file
>>>>>>>>       writeMode = WriteMode.OVERWRITE)
>>>>>>>>
>>>>>>>>     result.writeToSink(sink)
>>>>>>>>
>>>>>>>>
>>>>>>>>     env.execute("this job")
>>>>>>>>
>>>>>>>>   }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <
>>>>>>>> nico@data-artisans.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Karim,
>>>>>>>>> when I was trying to reproduce your code, I got an exception with
>>>>>>>>> the
>>>>>>>>> name 'table' being used - by replacing it and completing the job
>>>>>>>>> with
>>>>>>>>> some input, I did see the csv file popping up. Also, the job was
>>>>>>>>> crashing when the file 1.txt already existed.
>>>>>>>>>
>>>>>>>>> The code I used (running Flink 1.5-SNAPSHOT):
>>>>>>>>>
>>>>>>>>>   def main(args: Array[String]) {
>>>>>>>>>     // set up the streaming execution environment
>>>>>>>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>>>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>>>>>>
>>>>>>>>>     val stream: DataStream[(Int, Long, String)] =
>>>>>>>>> get3TupleDataStream(env)
>>>>>>>>>       .assignAscendingTimestamps(_._2)
>>>>>>>>>     tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
>>>>>>>>>
>>>>>>>>>     val results = tableEnv.sqlQuery( """
>>>>>>>>>                                        |SELECT
>>>>>>>>>                                        | A,C
>>>>>>>>>                                        | FROM mytable
>>>>>>>>>                                      """.stripMargin)
>>>>>>>>>
>>>>>>>>>     val result: Table = results
>>>>>>>>>
>>>>>>>>>     val path = "file:///tmp/test/1.txt"
>>>>>>>>>     val sink :TableSink[Row]=   new CsvTableSink(
>>>>>>>>>       path,                             // output path
>>>>>>>>>       fieldDelim = "|",                 // optional: delimit files
>>>>>>>>> by '|'
>>>>>>>>>       numFiles = 1,                     // optional: write to a
>>>>>>>>> single file
>>>>>>>>>       writeMode = WriteMode.NO_OVERWRITE)
>>>>>>>>>
>>>>>>>>>     result.writeToSink(sink)
>>>>>>>>>
>>>>>>>>>     env.execute("this job")
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>>   def get3TupleDataStream(env: StreamExecutionEnvironment):
>>>>>>>>> DataStream[(Int, Long, String)] = {
>>>>>>>>>     val data = new mutable.MutableList[(Int, Long, String)]
>>>>>>>>>     data.+=((1, 1L, "Hi"))
>>>>>>>>>     data.+=((2, 2L, "Hello"))
>>>>>>>>>     data.+=((3, 2L, "Hello world"))
>>>>>>>>>     data.+=((4, 3L, "Hello world, how are you?"))
>>>>>>>>>     data.+=((5, 3L, "I am fine."))
>>>>>>>>>     data.+=((6, 3L, "Luke Skywalker"))
>>>>>>>>>     env.fromCollection(data)
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Nico
>>>>>>>>>
>>>>>>>>> On 16/03/18 22:50, karim amer wrote:
>>>>>>>>> > Hi There,
>>>>>>>>> >
>>>>>>>>> >  I am trying to write a CSVsink to disk but it's not getting
>>>>>>>>> written. I
>>>>>>>>> > think the file is getting overwritten or truncated once The
>>>>>>>>> Stream
>>>>>>>>> > process finishes. Does anyone know why the file is getting
>>>>>>>>> overwritten
>>>>>>>>> > or truncated and how can i fix this ?
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > tableEnv.registerDataStream("table", watermarkedStream)
>>>>>>>>> >
>>>>>>>>> > val results = tableEnv.sqlQuery( """
>>>>>>>>> > |SELECT
>>>>>>>>> > | A
>>>>>>>>> > | FROM table
>>>>>>>>> > """.stripMargin)
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > val result: Table = results
>>>>>>>>> >
>>>>>>>>> > val path = "file:///path/test/1.txt"
>>>>>>>>> > val sink :TableSink[Row]=   new CsvTableSink(
>>>>>>>>> >   path, // output path
>>>>>>>>> > fieldDelim = "|", // optional: delimit files by '|'
>>>>>>>>> > numFiles = 1, // optional: write to a single file
>>>>>>>>> > writeMode = WriteMode.NO_OVERWRITE)
>>>>>>>>> >
>>>>>>>>> > result.writeToSink(sink)
>>>>>>>>> >
>>>>>>>>> > env.execute("this job")
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > Thanks
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> karim amer
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> karim amer
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> karim amer
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> karim amer
>>>>
>>>>
>>>
>>>
>>> --
>>> karim amer
>>>
>>>
>>
>>
>> --
>> karim amer
>>
>>
>
>
> --
> karim amer
>
>

Re: CsvSink

Posted by karim amer <ka...@gmail.com>.
Never mind I found the error and has nothing to do with flink.
Sorry

On Tue, Mar 20, 2018 at 12:12 PM, karim amer <ka...@gmail.com>
wrote:

> here is the output after fixing the scala issues
>
> https://gist.github.com/karimamer/9e3bcf0a6d9110c01caa2ebd14aa7a8c
>
> On Tue, Mar 20, 2018 at 11:39 AM, karim amer <ka...@gmail.com>
> wrote:
>
>> Never mind after importing
>>
>> import org.apache.flink.api.scala._
>>
>> theses errors went away and i still have the original problem.
>> Sorry my bad
>>
>> On Tue, Mar 20, 2018 at 11:04 AM, karim amer <ka...@gmail.com>
>> wrote:
>>
>>> To clarify should i file a bug report on sbt hiding the errors in the
>>> previous email ?
>>>
>>> On Tue, Mar 20, 2018 at 9:44 AM, karim amer <ka...@gmail.com>
>>> wrote:
>>>
>>>> After switching to Maven from Sbt I got these errors
>>>> Error:(63, 37) could not find implicit value for evidence parameter of
>>>> type org.apache.flink.api.common.typeinfo.TypeInformation[org.apa
>>>> che.flink.quickstart.DataStreamtotableapi.Calls]
>>>>     val namedStream = dataStream.map((value:String) => {
>>>>
>>>>
>>>> Error:(63, 37) not enough arguments for method map: (implicit
>>>> evidence$7: org.apache.flink.api.common.ty
>>>> peinfo.TypeInformation[org.apache.flink.quickstart.DataStrea
>>>> mtotableapi.Calls])org.apache.flink.streaming.api.scala.Data
>>>> Stream[org.apache.flink.quickstart.DataStreamtotableapi.Calls].
>>>> Unspecified value parameter evidence$7.
>>>>     val namedStream = dataStream.map((value:String) => {
>>>>
>>>>
>>>> Should i file a bug report  ?
>>>>
>>>> On Tue, Mar 20, 2018 at 9:30 AM, karim amer <ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Fabian
>>>>> Sorry if i confused you The first error is from Nico's code Not my
>>>>> code or  snippet
>>>>> I am still having the original problem in my snippet where it's
>>>>> writing a blank csv file even though i get
>>>>> [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
>>>>> After running the job
>>>>>
>>>>> Cheers,
>>>>> karim
>>>>>
>>>>> On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <fh...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Karim,
>>>>>>
>>>>>> I cannot find a method invocation "tableEnv.registerDataStream("myTable2",
>>>>>> set, 'A, 'B, 'C )" as shown in the error message in your example.
>>>>>> It would help if you would keep error message and code consistent.
>>>>>> Otherwise it's not possible to figure out what's going on.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> 2018-03-20 0:24 GMT+01:00 karim amer <ka...@gmail.com>:
>>>>>>
>>>>>>> Hi Nico,
>>>>>>>
>>>>>>> I tried to reproduce your code but registerDataStream keeps failing
>>>>>>> to register the fields even though i am following your code and the Docs.
>>>>>>> here is the error
>>>>>>> [error]  found   : Symbol
>>>>>>> [error]  required: org.apache.flink.table.expressions.Expression
>>>>>>> [error]     tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C
>>>>>>> )
>>>>>>> [error]
>>>>>>> I think my code snippet was misleading. Here is the full snippet
>>>>>>> Changing the name from table didn't fix it for
>>>>>>>
>>>>>>> import org.apache.flink.streaming.api.scala._
>>>>>>> import org.apache.flink.api.java.utils.ParameterTool
>>>>>>> import org.apache.flink.core.fs.FileSystem.WriteMode
>>>>>>> import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
>>>>>>> import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
>>>>>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>>>>>> import org.apache.flink.streaming.api.windowing.time.Time
>>>>>>> import org.apache.flink.table.api.{Table, TableEnvironment}
>>>>>>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
>>>>>>> import org.apache.flink.types.Row
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> object datastreamtotableapi {
>>>>>>>
>>>>>>>   case class Calls(a: String,
>>>>>>>                    b: String,
>>>>>>>                    c: String,
>>>>>>>                    d: String,
>>>>>>>                    e: String,
>>>>>>>                    f: String,
>>>>>>>                    g: String,
>>>>>>>                    h: String,
>>>>>>>                    i: String,
>>>>>>>                    j: String,
>>>>>>>                    k: String,
>>>>>>>                    l: String,
>>>>>>>                    m: String,
>>>>>>>                    n: String,
>>>>>>>                    p: String,
>>>>>>>                    q: String,
>>>>>>>                    r: String,
>>>>>>>                    s: String,
>>>>>>>                    t: String,
>>>>>>>                    v: String,
>>>>>>>                    w: String)
>>>>>>>
>>>>>>>
>>>>>>>   def main(args: Array[String]) {
>>>>>>>
>>>>>>>     val params = ParameterTool.fromArgs(args)
>>>>>>>     val input = params.getRequired("input")
>>>>>>>
>>>>>>>
>>>>>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>>>>     env.setParallelism(1)
>>>>>>>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>>>>
>>>>>>>     val dataStream = env.readTextFile(input)
>>>>>>>
>>>>>>>     val namedStream = dataStream.map((value:String) => {
>>>>>>>
>>>>>>>       val columns = value.split(",")
>>>>>>>       Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
>>>>>>>         columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
>>>>>>>         columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
>>>>>>>         columns(18),columns(19), columns(20)
>>>>>>>       )
>>>>>>>     })
>>>>>>>
>>>>>>>
>>>>>>>    val cleanedStream =  namedStream.filter(_.j == " ").filter(_.k==" ")
>>>>>>>
>>>>>>>    val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
>>>>>>>      override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
>>>>>>>    })
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>     tableEnv.registerDataStream("CDRS", watermarkedStream)
>>>>>>>     val results = tableEnv.sqlQuery( """
>>>>>>>                                       |SELECT
>>>>>>>                                       | a
>>>>>>>                                       | FROM CDRS
>>>>>>>                                        """.stripMargin)
>>>>>>>
>>>>>>>
>>>>>>>     val result: Table = results
>>>>>>>
>>>>>>>     val path = "file:///Users/test/1.txt"
>>>>>>>     val sink :TableSink[Row]=   new CsvTableSink(
>>>>>>>       path,                             // output path
>>>>>>>       fieldDelim = "|",                 // optional: delimit files by '|'
>>>>>>>       numFiles = 1,                     // optional: write to a single file
>>>>>>>       writeMode = WriteMode.OVERWRITE)
>>>>>>>
>>>>>>>     result.writeToSink(sink)
>>>>>>>
>>>>>>>
>>>>>>>     env.execute("this job")
>>>>>>>
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <nico@data-artisans.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Hi Karim,
>>>>>>>> when I was trying to reproduce your code, I got an exception with
>>>>>>>> the
>>>>>>>> name 'table' being used - by replacing it and completing the job
>>>>>>>> with
>>>>>>>> some input, I did see the csv file popping up. Also, the job was
>>>>>>>> crashing when the file 1.txt already existed.
>>>>>>>>
>>>>>>>> The code I used (running Flink 1.5-SNAPSHOT):
>>>>>>>>
>>>>>>>>   def main(args: Array[String]) {
>>>>>>>>     // set up the streaming execution environment
>>>>>>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>>>>>
>>>>>>>>     val stream: DataStream[(Int, Long, String)] =
>>>>>>>> get3TupleDataStream(env)
>>>>>>>>       .assignAscendingTimestamps(_._2)
>>>>>>>>     tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
>>>>>>>>
>>>>>>>>     val results = tableEnv.sqlQuery( """
>>>>>>>>                                        |SELECT
>>>>>>>>                                        | A,C
>>>>>>>>                                        | FROM mytable
>>>>>>>>                                      """.stripMargin)
>>>>>>>>
>>>>>>>>     val result: Table = results
>>>>>>>>
>>>>>>>>     val path = "file:///tmp/test/1.txt"
>>>>>>>>     val sink :TableSink[Row]=   new CsvTableSink(
>>>>>>>>       path,                             // output path
>>>>>>>>       fieldDelim = "|",                 // optional: delimit files
>>>>>>>> by '|'
>>>>>>>>       numFiles = 1,                     // optional: write to a
>>>>>>>> single file
>>>>>>>>       writeMode = WriteMode.NO_OVERWRITE)
>>>>>>>>
>>>>>>>>     result.writeToSink(sink)
>>>>>>>>
>>>>>>>>     env.execute("this job")
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   def get3TupleDataStream(env: StreamExecutionEnvironment):
>>>>>>>> DataStream[(Int, Long, String)] = {
>>>>>>>>     val data = new mutable.MutableList[(Int, Long, String)]
>>>>>>>>     data.+=((1, 1L, "Hi"))
>>>>>>>>     data.+=((2, 2L, "Hello"))
>>>>>>>>     data.+=((3, 2L, "Hello world"))
>>>>>>>>     data.+=((4, 3L, "Hello world, how are you?"))
>>>>>>>>     data.+=((5, 3L, "I am fine."))
>>>>>>>>     data.+=((6, 3L, "Luke Skywalker"))
>>>>>>>>     env.fromCollection(data)
>>>>>>>>   }
>>>>>>>>
>>>>>>>>
>>>>>>>> Nico
>>>>>>>>
>>>>>>>> On 16/03/18 22:50, karim amer wrote:
>>>>>>>> > Hi There,
>>>>>>>> >
>>>>>>>> >  I am trying to write a CSVsink to disk but it's not getting
>>>>>>>> written. I
>>>>>>>> > think the file is getting overwritten or truncated once The Stream
>>>>>>>> > process finishes. Does anyone know why the file is getting
>>>>>>>> overwritten
>>>>>>>> > or truncated and how can i fix this ?
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > tableEnv.registerDataStream("table", watermarkedStream)
>>>>>>>> >
>>>>>>>> > val results = tableEnv.sqlQuery( """
>>>>>>>> > |SELECT
>>>>>>>> > | A
>>>>>>>> > | FROM table
>>>>>>>> > """.stripMargin)
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > val result: Table = results
>>>>>>>> >
>>>>>>>> > val path = "file:///path/test/1.txt"
>>>>>>>> > val sink :TableSink[Row]=   new CsvTableSink(
>>>>>>>> >   path, // output path
>>>>>>>> > fieldDelim = "|", // optional: delimit files by '|'
>>>>>>>> > numFiles = 1, // optional: write to a single file
>>>>>>>> > writeMode = WriteMode.NO_OVERWRITE)
>>>>>>>> >
>>>>>>>> > result.writeToSink(sink)
>>>>>>>> >
>>>>>>>> > env.execute("this job")
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > Thanks
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> karim amer
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> karim amer
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> karim amer
>>>>
>>>>
>>>
>>>
>>> --
>>> karim amer
>>>
>>>
>>
>>
>> --
>> karim amer
>>
>>
>
>
> --
> karim amer
>
>


-- 
karim amer

Re: CsvSink

Posted by karim amer <ka...@gmail.com>.
here is the output after fixing the scala issues

https://gist.github.com/karimamer/9e3bcf0a6d9110c01caa2ebd14aa7a8c

On Tue, Mar 20, 2018 at 11:39 AM, karim amer <ka...@gmail.com>
wrote:

> Never mind after importing
>
> import org.apache.flink.api.scala._
>
> theses errors went away and i still have the original problem.
> Sorry my bad
>
> On Tue, Mar 20, 2018 at 11:04 AM, karim amer <ka...@gmail.com>
> wrote:
>
>> To clarify should i file a bug report on sbt hiding the errors in the
>> previous email ?
>>
>> On Tue, Mar 20, 2018 at 9:44 AM, karim amer <ka...@gmail.com>
>> wrote:
>>
>>> After switching to Maven from Sbt I got these errors
>>> Error:(63, 37) could not find implicit value for evidence parameter of
>>> type org.apache.flink.api.common.typeinfo.TypeInformation[org.apa
>>> che.flink.quickstart.DataStreamtotableapi.Calls]
>>>     val namedStream = dataStream.map((value:String) => {
>>>
>>>
>>> Error:(63, 37) not enough arguments for method map: (implicit
>>> evidence$7: org.apache.flink.api.common.typeinfo.TypeInformation[org.apa
>>> che.flink.quickstart.DataStreamtotableapi.Calls])org.apache.
>>> flink.streaming.api.scala.DataStream[org.apache.flink.quicks
>>> tart.DataStreamtotableapi.Calls].
>>> Unspecified value parameter evidence$7.
>>>     val namedStream = dataStream.map((value:String) => {
>>>
>>>
>>> Should i file a bug report  ?
>>>
>>> On Tue, Mar 20, 2018 at 9:30 AM, karim amer <ka...@gmail.com>
>>> wrote:
>>>
>>>> Hi Fabian
>>>> Sorry if i confused you The first error is from Nico's code Not my code
>>>> or  snippet
>>>> I am still having the original problem in my snippet where it's writing
>>>> a blank csv file even though i get
>>>> [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
>>>> After running the job
>>>>
>>>> Cheers,
>>>> karim
>>>>
>>>> On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Karim,
>>>>>
>>>>> I cannot find a method invocation "tableEnv.registerDataStream("myTable2",
>>>>> set, 'A, 'B, 'C )" as shown in the error message in your example.
>>>>> It would help if you would keep error message and code consistent.
>>>>> Otherwise it's not possible to figure out what's going on.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2018-03-20 0:24 GMT+01:00 karim amer <ka...@gmail.com>:
>>>>>
>>>>>> Hi Nico,
>>>>>>
>>>>>> I tried to reproduce your code but registerDataStream keeps failing
>>>>>> to register the fields even though i am following your code and the Docs.
>>>>>> here is the error
>>>>>> [error]  found   : Symbol
>>>>>> [error]  required: org.apache.flink.table.expressions.Expression
>>>>>> [error]     tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
>>>>>> [error]
>>>>>> I think my code snippet was misleading. Here is the full snippet
>>>>>> Changing the name from table didn't fix it for
>>>>>>
>>>>>> import org.apache.flink.streaming.api.scala._
>>>>>> import org.apache.flink.api.java.utils.ParameterTool
>>>>>> import org.apache.flink.core.fs.FileSystem.WriteMode
>>>>>> import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
>>>>>> import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
>>>>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>>>>> import org.apache.flink.streaming.api.windowing.time.Time
>>>>>> import org.apache.flink.table.api.{Table, TableEnvironment}
>>>>>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
>>>>>> import org.apache.flink.types.Row
>>>>>>
>>>>>>
>>>>>>
>>>>>> object datastreamtotableapi {
>>>>>>
>>>>>>   case class Calls(a: String,
>>>>>>                    b: String,
>>>>>>                    c: String,
>>>>>>                    d: String,
>>>>>>                    e: String,
>>>>>>                    f: String,
>>>>>>                    g: String,
>>>>>>                    h: String,
>>>>>>                    i: String,
>>>>>>                    j: String,
>>>>>>                    k: String,
>>>>>>                    l: String,
>>>>>>                    m: String,
>>>>>>                    n: String,
>>>>>>                    p: String,
>>>>>>                    q: String,
>>>>>>                    r: String,
>>>>>>                    s: String,
>>>>>>                    t: String,
>>>>>>                    v: String,
>>>>>>                    w: String)
>>>>>>
>>>>>>
>>>>>>   def main(args: Array[String]) {
>>>>>>
>>>>>>     val params = ParameterTool.fromArgs(args)
>>>>>>     val input = params.getRequired("input")
>>>>>>
>>>>>>
>>>>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>>>     env.setParallelism(1)
>>>>>>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>>>
>>>>>>     val dataStream = env.readTextFile(input)
>>>>>>
>>>>>>     val namedStream = dataStream.map((value:String) => {
>>>>>>
>>>>>>       val columns = value.split(",")
>>>>>>       Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
>>>>>>         columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
>>>>>>         columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
>>>>>>         columns(18),columns(19), columns(20)
>>>>>>       )
>>>>>>     })
>>>>>>
>>>>>>
>>>>>>    val cleanedStream =  namedStream.filter(_.j == " ").filter(_.k==" ")
>>>>>>
>>>>>>    val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
>>>>>>      override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
>>>>>>    })
>>>>>>
>>>>>>
>>>>>>
>>>>>>     tableEnv.registerDataStream("CDRS", watermarkedStream)
>>>>>>     val results = tableEnv.sqlQuery( """
>>>>>>                                       |SELECT
>>>>>>                                       | a
>>>>>>                                       | FROM CDRS
>>>>>>                                        """.stripMargin)
>>>>>>
>>>>>>
>>>>>>     val result: Table = results
>>>>>>
>>>>>>     val path = "file:///Users/test/1.txt"
>>>>>>     val sink :TableSink[Row]=   new CsvTableSink(
>>>>>>       path,                             // output path
>>>>>>       fieldDelim = "|",                 // optional: delimit files by '|'
>>>>>>       numFiles = 1,                     // optional: write to a single file
>>>>>>       writeMode = WriteMode.OVERWRITE)
>>>>>>
>>>>>>     result.writeToSink(sink)
>>>>>>
>>>>>>
>>>>>>     env.execute("this job")
>>>>>>
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <ni...@data-artisans.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Karim,
>>>>>>> when I was trying to reproduce your code, I got an exception with the
>>>>>>> name 'table' being used - by replacing it and completing the job with
>>>>>>> some input, I did see the csv file popping up. Also, the job was
>>>>>>> crashing when the file 1.txt already existed.
>>>>>>>
>>>>>>> The code I used (running Flink 1.5-SNAPSHOT):
>>>>>>>
>>>>>>>   def main(args: Array[String]) {
>>>>>>>     // set up the streaming execution environment
>>>>>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>>>>
>>>>>>>     val stream: DataStream[(Int, Long, String)] =
>>>>>>> get3TupleDataStream(env)
>>>>>>>       .assignAscendingTimestamps(_._2)
>>>>>>>     tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
>>>>>>>
>>>>>>>     val results = tableEnv.sqlQuery( """
>>>>>>>                                        |SELECT
>>>>>>>                                        | A,C
>>>>>>>                                        | FROM mytable
>>>>>>>                                      """.stripMargin)
>>>>>>>
>>>>>>>     val result: Table = results
>>>>>>>
>>>>>>>     val path = "file:///tmp/test/1.txt"
>>>>>>>     val sink :TableSink[Row]=   new CsvTableSink(
>>>>>>>       path,                             // output path
>>>>>>>       fieldDelim = "|",                 // optional: delimit files
>>>>>>> by '|'
>>>>>>>       numFiles = 1,                     // optional: write to a
>>>>>>> single file
>>>>>>>       writeMode = WriteMode.NO_OVERWRITE)
>>>>>>>
>>>>>>>     result.writeToSink(sink)
>>>>>>>
>>>>>>>     env.execute("this job")
>>>>>>>   }
>>>>>>>
>>>>>>>   def get3TupleDataStream(env: StreamExecutionEnvironment):
>>>>>>> DataStream[(Int, Long, String)] = {
>>>>>>>     val data = new mutable.MutableList[(Int, Long, String)]
>>>>>>>     data.+=((1, 1L, "Hi"))
>>>>>>>     data.+=((2, 2L, "Hello"))
>>>>>>>     data.+=((3, 2L, "Hello world"))
>>>>>>>     data.+=((4, 3L, "Hello world, how are you?"))
>>>>>>>     data.+=((5, 3L, "I am fine."))
>>>>>>>     data.+=((6, 3L, "Luke Skywalker"))
>>>>>>>     env.fromCollection(data)
>>>>>>>   }
>>>>>>>
>>>>>>>
>>>>>>> Nico
>>>>>>>
>>>>>>> On 16/03/18 22:50, karim amer wrote:
>>>>>>> > Hi There,
>>>>>>> >
>>>>>>> >  I am trying to write a CSVsink to disk but it's not getting
>>>>>>> written. I
>>>>>>> > think the file is getting overwritten or truncated once The Stream
>>>>>>> > process finishes. Does anyone know why the file is getting
>>>>>>> overwritten
>>>>>>> > or truncated and how can i fix this ?
>>>>>>> >
>>>>>>> >
>>>>>>> > tableEnv.registerDataStream("table", watermarkedStream)
>>>>>>> >
>>>>>>> > val results = tableEnv.sqlQuery( """
>>>>>>> > |SELECT
>>>>>>> > | A
>>>>>>> > | FROM table
>>>>>>> > """.stripMargin)
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > val result: Table = results
>>>>>>> >
>>>>>>> > val path = "file:///path/test/1.txt"
>>>>>>> > val sink :TableSink[Row]=   new CsvTableSink(
>>>>>>> >   path, // output path
>>>>>>> > fieldDelim = "|", // optional: delimit files by '|'
>>>>>>> > numFiles = 1, // optional: write to a single file
>>>>>>> > writeMode = WriteMode.NO_OVERWRITE)
>>>>>>> >
>>>>>>> > result.writeToSink(sink)
>>>>>>> >
>>>>>>> > env.execute("this job")
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > Thanks
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> karim amer
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> karim amer
>>>>
>>>>
>>>
>>>
>>> --
>>> karim amer
>>>
>>>
>>
>>
>> --
>> karim amer
>>
>>
>
>
> --
> karim amer
>
>


-- 
karim amer

Re: CsvSink

Posted by karim amer <ka...@gmail.com>.
Never mind after importing

import org.apache.flink.api.scala._

theses errors went away and i still have the original problem.
Sorry my bad

On Tue, Mar 20, 2018 at 11:04 AM, karim amer <ka...@gmail.com>
wrote:

> To clarify should i file a bug report on sbt hiding the errors in the
> previous email ?
>
> On Tue, Mar 20, 2018 at 9:44 AM, karim amer <ka...@gmail.com>
> wrote:
>
>> After switching to Maven from Sbt I got these errors
>> Error:(63, 37) could not find implicit value for evidence parameter of
>> type org.apache.flink.api.common.typeinfo.TypeInformation[org.apa
>> che.flink.quickstart.DataStreamtotableapi.Calls]
>>     val namedStream = dataStream.map((value:String) => {
>>
>>
>> Error:(63, 37) not enough arguments for method map: (implicit evidence$7:
>> org.apache.flink.api.common.typeinfo.TypeInformation[org.apa
>> che.flink.quickstart.DataStreamtotableapi.Calls])org.apache.
>> flink.streaming.api.scala.DataStream[org.apache.flink.
>> quickstart.DataStreamtotableapi.Calls].
>> Unspecified value parameter evidence$7.
>>     val namedStream = dataStream.map((value:String) => {
>>
>>
>> Should i file a bug report  ?
>>
>> On Tue, Mar 20, 2018 at 9:30 AM, karim amer <ka...@gmail.com>
>> wrote:
>>
>>> Hi Fabian
>>> Sorry if i confused you The first error is from Nico's code Not my code
>>> or  snippet
>>> I am still having the original problem in my snippet where it's writing
>>> a blank csv file even though i get
>>> [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
>>> After running the job
>>>
>>> Cheers,
>>> karim
>>>
>>> On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <fh...@gmail.com>
>>> wrote:
>>>
>>>> Hi Karim,
>>>>
>>>> I cannot find a method invocation "tableEnv.registerDataStream("myTable2",
>>>> set, 'A, 'B, 'C )" as shown in the error message in your example.
>>>> It would help if you would keep error message and code consistent.
>>>> Otherwise it's not possible to figure out what's going on.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2018-03-20 0:24 GMT+01:00 karim amer <ka...@gmail.com>:
>>>>
>>>>> Hi Nico,
>>>>>
>>>>> I tried to reproduce your code but registerDataStream keeps failing
>>>>> to register the fields even though i am following your code and the Docs.
>>>>> here is the error
>>>>> [error]  found   : Symbol
>>>>> [error]  required: org.apache.flink.table.expressions.Expression
>>>>> [error]     tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
>>>>> [error]
>>>>> I think my code snippet was misleading. Here is the full snippet
>>>>> Changing the name from table didn't fix it for
>>>>>
>>>>> import org.apache.flink.streaming.api.scala._
>>>>> import org.apache.flink.api.java.utils.ParameterTool
>>>>> import org.apache.flink.core.fs.FileSystem.WriteMode
>>>>> import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
>>>>> import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
>>>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>>>> import org.apache.flink.streaming.api.windowing.time.Time
>>>>> import org.apache.flink.table.api.{Table, TableEnvironment}
>>>>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
>>>>> import org.apache.flink.types.Row
>>>>>
>>>>>
>>>>>
>>>>> object datastreamtotableapi {
>>>>>
>>>>>   case class Calls(a: String,
>>>>>                    b: String,
>>>>>                    c: String,
>>>>>                    d: String,
>>>>>                    e: String,
>>>>>                    f: String,
>>>>>                    g: String,
>>>>>                    h: String,
>>>>>                    i: String,
>>>>>                    j: String,
>>>>>                    k: String,
>>>>>                    l: String,
>>>>>                    m: String,
>>>>>                    n: String,
>>>>>                    p: String,
>>>>>                    q: String,
>>>>>                    r: String,
>>>>>                    s: String,
>>>>>                    t: String,
>>>>>                    v: String,
>>>>>                    w: String)
>>>>>
>>>>>
>>>>>   def main(args: Array[String]) {
>>>>>
>>>>>     val params = ParameterTool.fromArgs(args)
>>>>>     val input = params.getRequired("input")
>>>>>
>>>>>
>>>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>>     env.setParallelism(1)
>>>>>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>>
>>>>>     val dataStream = env.readTextFile(input)
>>>>>
>>>>>     val namedStream = dataStream.map((value:String) => {
>>>>>
>>>>>       val columns = value.split(",")
>>>>>       Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
>>>>>         columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
>>>>>         columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
>>>>>         columns(18),columns(19), columns(20)
>>>>>       )
>>>>>     })
>>>>>
>>>>>
>>>>>    val cleanedStream =  namedStream.filter(_.j == " ").filter(_.k==" ")
>>>>>
>>>>>    val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
>>>>>      override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
>>>>>    })
>>>>>
>>>>>
>>>>>
>>>>>     tableEnv.registerDataStream("CDRS", watermarkedStream)
>>>>>     val results = tableEnv.sqlQuery( """
>>>>>                                       |SELECT
>>>>>                                       | a
>>>>>                                       | FROM CDRS
>>>>>                                        """.stripMargin)
>>>>>
>>>>>
>>>>>     val result: Table = results
>>>>>
>>>>>     val path = "file:///Users/test/1.txt"
>>>>>     val sink :TableSink[Row]=   new CsvTableSink(
>>>>>       path,                             // output path
>>>>>       fieldDelim = "|",                 // optional: delimit files by '|'
>>>>>       numFiles = 1,                     // optional: write to a single file
>>>>>       writeMode = WriteMode.OVERWRITE)
>>>>>
>>>>>     result.writeToSink(sink)
>>>>>
>>>>>
>>>>>     env.execute("this job")
>>>>>
>>>>>   }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <ni...@data-artisans.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Karim,
>>>>>> when I was trying to reproduce your code, I got an exception with the
>>>>>> name 'table' being used - by replacing it and completing the job with
>>>>>> some input, I did see the csv file popping up. Also, the job was
>>>>>> crashing when the file 1.txt already existed.
>>>>>>
>>>>>> The code I used (running Flink 1.5-SNAPSHOT):
>>>>>>
>>>>>>   def main(args: Array[String]) {
>>>>>>     // set up the streaming execution environment
>>>>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>>>
>>>>>>     val stream: DataStream[(Int, Long, String)] =
>>>>>> get3TupleDataStream(env)
>>>>>>       .assignAscendingTimestamps(_._2)
>>>>>>     tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
>>>>>>
>>>>>>     val results = tableEnv.sqlQuery( """
>>>>>>                                        |SELECT
>>>>>>                                        | A,C
>>>>>>                                        | FROM mytable
>>>>>>                                      """.stripMargin)
>>>>>>
>>>>>>     val result: Table = results
>>>>>>
>>>>>>     val path = "file:///tmp/test/1.txt"
>>>>>>     val sink :TableSink[Row]=   new CsvTableSink(
>>>>>>       path,                             // output path
>>>>>>       fieldDelim = "|",                 // optional: delimit files by
>>>>>> '|'
>>>>>>       numFiles = 1,                     // optional: write to a
>>>>>> single file
>>>>>>       writeMode = WriteMode.NO_OVERWRITE)
>>>>>>
>>>>>>     result.writeToSink(sink)
>>>>>>
>>>>>>     env.execute("this job")
>>>>>>   }
>>>>>>
>>>>>>   def get3TupleDataStream(env: StreamExecutionEnvironment):
>>>>>> DataStream[(Int, Long, String)] = {
>>>>>>     val data = new mutable.MutableList[(Int, Long, String)]
>>>>>>     data.+=((1, 1L, "Hi"))
>>>>>>     data.+=((2, 2L, "Hello"))
>>>>>>     data.+=((3, 2L, "Hello world"))
>>>>>>     data.+=((4, 3L, "Hello world, how are you?"))
>>>>>>     data.+=((5, 3L, "I am fine."))
>>>>>>     data.+=((6, 3L, "Luke Skywalker"))
>>>>>>     env.fromCollection(data)
>>>>>>   }
>>>>>>
>>>>>>
>>>>>> Nico
>>>>>>
>>>>>> On 16/03/18 22:50, karim amer wrote:
>>>>>> > Hi There,
>>>>>> >
>>>>>> >  I am trying to write a CSVsink to disk but it's not getting
>>>>>> written. I
>>>>>> > think the file is getting overwritten or truncated once The Stream
>>>>>> > process finishes. Does anyone know why the file is getting
>>>>>> overwritten
>>>>>> > or truncated and how can i fix this ?
>>>>>> >
>>>>>> >
>>>>>> > tableEnv.registerDataStream("table", watermarkedStream)
>>>>>> >
>>>>>> > val results = tableEnv.sqlQuery( """
>>>>>> > |SELECT
>>>>>> > | A
>>>>>> > | FROM table
>>>>>> > """.stripMargin)
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > val result: Table = results
>>>>>> >
>>>>>> > val path = "file:///path/test/1.txt"
>>>>>> > val sink :TableSink[Row]=   new CsvTableSink(
>>>>>> >   path, // output path
>>>>>> > fieldDelim = "|", // optional: delimit files by '|'
>>>>>> > numFiles = 1, // optional: write to a single file
>>>>>> > writeMode = WriteMode.NO_OVERWRITE)
>>>>>> >
>>>>>> > result.writeToSink(sink)
>>>>>> >
>>>>>> > env.execute("this job")
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > Thanks
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> karim amer
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> karim amer
>>>
>>>
>>
>>
>> --
>> karim amer
>>
>>
>
>
> --
> karim amer
>
>


-- 
karim amer

Re: CsvSink

Posted by karim amer <ka...@gmail.com>.
To clarify should i file a bug report on sbt hiding the errors in the
previous email ?

On Tue, Mar 20, 2018 at 9:44 AM, karim amer <ka...@gmail.com> wrote:

> After switching to Maven from Sbt I got these errors
> Error:(63, 37) could not find implicit value for evidence parameter of
> type org.apache.flink.api.common.typeinfo.TypeInformation[org.
> apache.flink.quickstart.DataStreamtotableapi.Calls]
>     val namedStream = dataStream.map((value:String) => {
>
>
> Error:(63, 37) not enough arguments for method map: (implicit evidence$7:
> org.apache.flink.api.common.typeinfo.TypeInformation[org.
> apache.flink.quickstart.DataStreamtotableapi.Calls])
> org.apache.flink.streaming.api.scala.DataStream[org.
> apache.flink.quickstart.DataStreamtotableapi.Calls].
> Unspecified value parameter evidence$7.
>     val namedStream = dataStream.map((value:String) => {
>
>
> Should i file a bug report  ?
>
> On Tue, Mar 20, 2018 at 9:30 AM, karim amer <ka...@gmail.com>
> wrote:
>
>> Hi Fabian
>> Sorry if i confused you The first error is from Nico's code Not my code
>> or  snippet
>> I am still having the original problem in my snippet where it's writing a
>> blank csv file even though i get
>> [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
>> After running the job
>>
>> Cheers,
>> karim
>>
>> On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi Karim,
>>>
>>> I cannot find a method invocation "tableEnv.registerDataStream("myTable2",
>>> set, 'A, 'B, 'C )" as shown in the error message in your example.
>>> It would help if you would keep error message and code consistent.
>>> Otherwise it's not possible to figure out what's going on.
>>>
>>> Best, Fabian
>>>
>>> 2018-03-20 0:24 GMT+01:00 karim amer <ka...@gmail.com>:
>>>
>>>> Hi Nico,
>>>>
>>>> I tried to reproduce your code but registerDataStream keeps failing to
>>>> register the fields even though i am following your code and the Docs.
>>>> here is the error
>>>> [error]  found   : Symbol
>>>> [error]  required: org.apache.flink.table.expressions.Expression
>>>> [error]     tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
>>>> [error]
>>>> I think my code snippet was misleading. Here is the full snippet
>>>> Changing the name from table didn't fix it for
>>>>
>>>> import org.apache.flink.streaming.api.scala._
>>>> import org.apache.flink.api.java.utils.ParameterTool
>>>> import org.apache.flink.core.fs.FileSystem.WriteMode
>>>> import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
>>>> import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
>>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>>> import org.apache.flink.streaming.api.windowing.time.Time
>>>> import org.apache.flink.table.api.{Table, TableEnvironment}
>>>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
>>>> import org.apache.flink.types.Row
>>>>
>>>>
>>>>
>>>> object datastreamtotableapi {
>>>>
>>>>   case class Calls(a: String,
>>>>                    b: String,
>>>>                    c: String,
>>>>                    d: String,
>>>>                    e: String,
>>>>                    f: String,
>>>>                    g: String,
>>>>                    h: String,
>>>>                    i: String,
>>>>                    j: String,
>>>>                    k: String,
>>>>                    l: String,
>>>>                    m: String,
>>>>                    n: String,
>>>>                    p: String,
>>>>                    q: String,
>>>>                    r: String,
>>>>                    s: String,
>>>>                    t: String,
>>>>                    v: String,
>>>>                    w: String)
>>>>
>>>>
>>>>   def main(args: Array[String]) {
>>>>
>>>>     val params = ParameterTool.fromArgs(args)
>>>>     val input = params.getRequired("input")
>>>>
>>>>
>>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>     env.setParallelism(1)
>>>>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>
>>>>     val dataStream = env.readTextFile(input)
>>>>
>>>>     val namedStream = dataStream.map((value:String) => {
>>>>
>>>>       val columns = value.split(",")
>>>>       Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
>>>>         columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
>>>>         columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
>>>>         columns(18),columns(19), columns(20)
>>>>       )
>>>>     })
>>>>
>>>>
>>>>    val cleanedStream =  namedStream.filter(_.j == " ").filter(_.k==" ")
>>>>
>>>>    val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
>>>>      override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
>>>>    })
>>>>
>>>>
>>>>
>>>>     tableEnv.registerDataStream("CDRS", watermarkedStream)
>>>>     val results = tableEnv.sqlQuery( """
>>>>                                       |SELECT
>>>>                                       | a
>>>>                                       | FROM CDRS
>>>>                                        """.stripMargin)
>>>>
>>>>
>>>>     val result: Table = results
>>>>
>>>>     val path = "file:///Users/test/1.txt"
>>>>     val sink :TableSink[Row]=   new CsvTableSink(
>>>>       path,                             // output path
>>>>       fieldDelim = "|",                 // optional: delimit files by '|'
>>>>       numFiles = 1,                     // optional: write to a single file
>>>>       writeMode = WriteMode.OVERWRITE)
>>>>
>>>>     result.writeToSink(sink)
>>>>
>>>>
>>>>     env.execute("this job")
>>>>
>>>>   }
>>>> }
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <ni...@data-artisans.com>
>>>> wrote:
>>>>
>>>>> Hi Karim,
>>>>> when I was trying to reproduce your code, I got an exception with the
>>>>> name 'table' being used - by replacing it and completing the job with
>>>>> some input, I did see the csv file popping up. Also, the job was
>>>>> crashing when the file 1.txt already existed.
>>>>>
>>>>> The code I used (running Flink 1.5-SNAPSHOT):
>>>>>
>>>>>   def main(args: Array[String]) {
>>>>>     // set up the streaming execution environment
>>>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>>
>>>>>     val stream: DataStream[(Int, Long, String)] =
>>>>> get3TupleDataStream(env)
>>>>>       .assignAscendingTimestamps(_._2)
>>>>>     tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
>>>>>
>>>>>     val results = tableEnv.sqlQuery( """
>>>>>                                        |SELECT
>>>>>                                        | A,C
>>>>>                                        | FROM mytable
>>>>>                                      """.stripMargin)
>>>>>
>>>>>     val result: Table = results
>>>>>
>>>>>     val path = "file:///tmp/test/1.txt"
>>>>>     val sink :TableSink[Row]=   new CsvTableSink(
>>>>>       path,                             // output path
>>>>>       fieldDelim = "|",                 // optional: delimit files by
>>>>> '|'
>>>>>       numFiles = 1,                     // optional: write to a single
>>>>> file
>>>>>       writeMode = WriteMode.NO_OVERWRITE)
>>>>>
>>>>>     result.writeToSink(sink)
>>>>>
>>>>>     env.execute("this job")
>>>>>   }
>>>>>
>>>>>   def get3TupleDataStream(env: StreamExecutionEnvironment):
>>>>> DataStream[(Int, Long, String)] = {
>>>>>     val data = new mutable.MutableList[(Int, Long, String)]
>>>>>     data.+=((1, 1L, "Hi"))
>>>>>     data.+=((2, 2L, "Hello"))
>>>>>     data.+=((3, 2L, "Hello world"))
>>>>>     data.+=((4, 3L, "Hello world, how are you?"))
>>>>>     data.+=((5, 3L, "I am fine."))
>>>>>     data.+=((6, 3L, "Luke Skywalker"))
>>>>>     env.fromCollection(data)
>>>>>   }
>>>>>
>>>>>
>>>>> Nico
>>>>>
>>>>> On 16/03/18 22:50, karim amer wrote:
>>>>> > Hi There,
>>>>> >
>>>>> >  I am trying to write a CSVsink to disk but it's not getting
>>>>> written. I
>>>>> > think the file is getting overwritten or truncated once The Stream
>>>>> > process finishes. Does anyone know why the file is getting
>>>>> overwritten
>>>>> > or truncated and how can i fix this ?
>>>>> >
>>>>> >
>>>>> > tableEnv.registerDataStream("table", watermarkedStream)
>>>>> >
>>>>> > val results = tableEnv.sqlQuery( """
>>>>> > |SELECT
>>>>> > | A
>>>>> > | FROM table
>>>>> > """.stripMargin)
>>>>> >
>>>>> >
>>>>> >
>>>>> > val result: Table = results
>>>>> >
>>>>> > val path = "file:///path/test/1.txt"
>>>>> > val sink :TableSink[Row]=   new CsvTableSink(
>>>>> >   path, // output path
>>>>> > fieldDelim = "|", // optional: delimit files by '|'
>>>>> > numFiles = 1, // optional: write to a single file
>>>>> > writeMode = WriteMode.NO_OVERWRITE)
>>>>> >
>>>>> > result.writeToSink(sink)
>>>>> >
>>>>> > env.execute("this job")
>>>>> >
>>>>> >
>>>>> >
>>>>> >
>>>>> > Thanks
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> karim amer
>>>>
>>>>
>>>
>>
>>
>> --
>> karim amer
>>
>>
>
>
> --
> karim amer
>
>


-- 
karim amer

Re: CsvSink

Posted by karim amer <ka...@gmail.com>.
After switching to Maven from Sbt I got these errors
Error:(63, 37) could not find implicit value for evidence parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.quickstart.DataStreamtotableapi.Calls]
    val namedStream = dataStream.map((value:String) => {


Error:(63, 37) not enough arguments for method map: (implicit evidence$7:
org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.quickstart.DataStreamtotableapi.Calls])org.apache.flink.streaming.api.scala.DataStream[org.apache.flink.quickstart.DataStreamtotableapi.Calls].
Unspecified value parameter evidence$7.
    val namedStream = dataStream.map((value:String) => {


Should i file a bug report  ?

On Tue, Mar 20, 2018 at 9:30 AM, karim amer <ka...@gmail.com> wrote:

> Hi Fabian
> Sorry if i confused you The first error is from Nico's code Not my code
> or  snippet
> I am still having the original problem in my snippet where it's writing a
> blank csv file even though i get
> [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
> After running the job
>
> Cheers,
> karim
>
> On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Karim,
>>
>> I cannot find a method invocation "tableEnv.registerDataStream("myTable2",
>> set, 'A, 'B, 'C )" as shown in the error message in your example.
>> It would help if you would keep error message and code consistent.
>> Otherwise it's not possible to figure out what's going on.
>>
>> Best, Fabian
>>
>> 2018-03-20 0:24 GMT+01:00 karim amer <ka...@gmail.com>:
>>
>>> Hi Nico,
>>>
>>> I tried to reproduce your code but registerDataStream keeps failing to
>>> register the fields even though i am following your code and the Docs.
>>> here is the error
>>> [error]  found   : Symbol
>>> [error]  required: org.apache.flink.table.expressions.Expression
>>> [error]     tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
>>> [error]
>>> I think my code snippet was misleading. Here is the full snippet
>>> Changing the name from table didn't fix it for
>>>
>>> import org.apache.flink.streaming.api.scala._
>>> import org.apache.flink.api.java.utils.ParameterTool
>>> import org.apache.flink.core.fs.FileSystem.WriteMode
>>> import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
>>> import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>> import org.apache.flink.streaming.api.windowing.time.Time
>>> import org.apache.flink.table.api.{Table, TableEnvironment}
>>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
>>> import org.apache.flink.types.Row
>>>
>>>
>>>
>>> object datastreamtotableapi {
>>>
>>>   case class Calls(a: String,
>>>                    b: String,
>>>                    c: String,
>>>                    d: String,
>>>                    e: String,
>>>                    f: String,
>>>                    g: String,
>>>                    h: String,
>>>                    i: String,
>>>                    j: String,
>>>                    k: String,
>>>                    l: String,
>>>                    m: String,
>>>                    n: String,
>>>                    p: String,
>>>                    q: String,
>>>                    r: String,
>>>                    s: String,
>>>                    t: String,
>>>                    v: String,
>>>                    w: String)
>>>
>>>
>>>   def main(args: Array[String]) {
>>>
>>>     val params = ParameterTool.fromArgs(args)
>>>     val input = params.getRequired("input")
>>>
>>>
>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>     env.setParallelism(1)
>>>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>
>>>     val dataStream = env.readTextFile(input)
>>>
>>>     val namedStream = dataStream.map((value:String) => {
>>>
>>>       val columns = value.split(",")
>>>       Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
>>>         columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
>>>         columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
>>>         columns(18),columns(19), columns(20)
>>>       )
>>>     })
>>>
>>>
>>>    val cleanedStream =  namedStream.filter(_.j == " ").filter(_.k==" ")
>>>
>>>    val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
>>>      override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
>>>    })
>>>
>>>
>>>
>>>     tableEnv.registerDataStream("CDRS", watermarkedStream)
>>>     val results = tableEnv.sqlQuery( """
>>>                                       |SELECT
>>>                                       | a
>>>                                       | FROM CDRS
>>>                                        """.stripMargin)
>>>
>>>
>>>     val result: Table = results
>>>
>>>     val path = "file:///Users/test/1.txt"
>>>     val sink :TableSink[Row]=   new CsvTableSink(
>>>       path,                             // output path
>>>       fieldDelim = "|",                 // optional: delimit files by '|'
>>>       numFiles = 1,                     // optional: write to a single file
>>>       writeMode = WriteMode.OVERWRITE)
>>>
>>>     result.writeToSink(sink)
>>>
>>>
>>>     env.execute("this job")
>>>
>>>   }
>>> }
>>>
>>>
>>>
>>>
>>> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <ni...@data-artisans.com>
>>> wrote:
>>>
>>>> Hi Karim,
>>>> when I was trying to reproduce your code, I got an exception with the
>>>> name 'table' being used - by replacing it and completing the job with
>>>> some input, I did see the csv file popping up. Also, the job was
>>>> crashing when the file 1.txt already existed.
>>>>
>>>> The code I used (running Flink 1.5-SNAPSHOT):
>>>>
>>>>   def main(args: Array[String]) {
>>>>     // set up the streaming execution environment
>>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>
>>>>     val stream: DataStream[(Int, Long, String)] =
>>>> get3TupleDataStream(env)
>>>>       .assignAscendingTimestamps(_._2)
>>>>     tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
>>>>
>>>>     val results = tableEnv.sqlQuery( """
>>>>                                        |SELECT
>>>>                                        | A,C
>>>>                                        | FROM mytable
>>>>                                      """.stripMargin)
>>>>
>>>>     val result: Table = results
>>>>
>>>>     val path = "file:///tmp/test/1.txt"
>>>>     val sink :TableSink[Row]=   new CsvTableSink(
>>>>       path,                             // output path
>>>>       fieldDelim = "|",                 // optional: delimit files by
>>>> '|'
>>>>       numFiles = 1,                     // optional: write to a single
>>>> file
>>>>       writeMode = WriteMode.NO_OVERWRITE)
>>>>
>>>>     result.writeToSink(sink)
>>>>
>>>>     env.execute("this job")
>>>>   }
>>>>
>>>>   def get3TupleDataStream(env: StreamExecutionEnvironment):
>>>> DataStream[(Int, Long, String)] = {
>>>>     val data = new mutable.MutableList[(Int, Long, String)]
>>>>     data.+=((1, 1L, "Hi"))
>>>>     data.+=((2, 2L, "Hello"))
>>>>     data.+=((3, 2L, "Hello world"))
>>>>     data.+=((4, 3L, "Hello world, how are you?"))
>>>>     data.+=((5, 3L, "I am fine."))
>>>>     data.+=((6, 3L, "Luke Skywalker"))
>>>>     env.fromCollection(data)
>>>>   }
>>>>
>>>>
>>>> Nico
>>>>
>>>> On 16/03/18 22:50, karim amer wrote:
>>>> > Hi There,
>>>> >
>>>> >  I am trying to write a CSVsink to disk but it's not getting written.
>>>> I
>>>> > think the file is getting overwritten or truncated once The Stream
>>>> > process finishes. Does anyone know why the file is getting overwritten
>>>> > or truncated and how can i fix this ?
>>>> >
>>>> >
>>>> > tableEnv.registerDataStream("table", watermarkedStream)
>>>> >
>>>> > val results = tableEnv.sqlQuery( """
>>>> > |SELECT
>>>> > | A
>>>> > | FROM table
>>>> > """.stripMargin)
>>>> >
>>>> >
>>>> >
>>>> > val result: Table = results
>>>> >
>>>> > val path = "file:///path/test/1.txt"
>>>> > val sink :TableSink[Row]=   new CsvTableSink(
>>>> >   path, // output path
>>>> > fieldDelim = "|", // optional: delimit files by '|'
>>>> > numFiles = 1, // optional: write to a single file
>>>> > writeMode = WriteMode.NO_OVERWRITE)
>>>> >
>>>> > result.writeToSink(sink)
>>>> >
>>>> > env.execute("this job")
>>>> >
>>>> >
>>>> >
>>>> >
>>>> > Thanks
>>>>
>>>>
>>>
>>>
>>> --
>>> karim amer
>>>
>>>
>>
>
>
> --
> karim amer
>
>


-- 
karim amer

Re: CsvSink

Posted by karim amer <ka...@gmail.com>.
Hi Fabian
Sorry if i confused you The first error is from Nico's code Not my code or
snippet
I am still having the original problem in my snippet where it's writing a
blank csv file even though i get
[success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
After running the job

Cheers,
karim

On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Karim,
>
> I cannot find a method invocation "tableEnv.registerDataStream("myTable2",
> set, 'A, 'B, 'C )" as shown in the error message in your example.
> It would help if you would keep error message and code consistent.
> Otherwise it's not possible to figure out what's going on.
>
> Best, Fabian
>
> 2018-03-20 0:24 GMT+01:00 karim amer <ka...@gmail.com>:
>
>> Hi Nico,
>>
>> I tried to reproduce your code but registerDataStream keeps failing to
>> register the fields even though i am following your code and the Docs.
>> here is the error
>> [error]  found   : Symbol
>> [error]  required: org.apache.flink.table.expressions.Expression
>> [error]     tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
>> [error]
>> I think my code snippet was misleading. Here is the full snippet Changing
>> the name from table didn't fix it for
>>
>> import org.apache.flink.streaming.api.scala._
>> import org.apache.flink.api.java.utils.ParameterTool
>> import org.apache.flink.core.fs.FileSystem.WriteMode
>> import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
>> import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>> import org.apache.flink.streaming.api.windowing.time.Time
>> import org.apache.flink.table.api.{Table, TableEnvironment}
>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
>> import org.apache.flink.types.Row
>>
>>
>>
>> object datastreamtotableapi {
>>
>>   case class Calls(a: String,
>>                    b: String,
>>                    c: String,
>>                    d: String,
>>                    e: String,
>>                    f: String,
>>                    g: String,
>>                    h: String,
>>                    i: String,
>>                    j: String,
>>                    k: String,
>>                    l: String,
>>                    m: String,
>>                    n: String,
>>                    p: String,
>>                    q: String,
>>                    r: String,
>>                    s: String,
>>                    t: String,
>>                    v: String,
>>                    w: String)
>>
>>
>>   def main(args: Array[String]) {
>>
>>     val params = ParameterTool.fromArgs(args)
>>     val input = params.getRequired("input")
>>
>>
>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>     env.setParallelism(1)
>>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>>
>>     val dataStream = env.readTextFile(input)
>>
>>     val namedStream = dataStream.map((value:String) => {
>>
>>       val columns = value.split(",")
>>       Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
>>         columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
>>         columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
>>         columns(18),columns(19), columns(20)
>>       )
>>     })
>>
>>
>>    val cleanedStream =  namedStream.filter(_.j == " ").filter(_.k==" ")
>>
>>    val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
>>      override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
>>    })
>>
>>
>>
>>     tableEnv.registerDataStream("CDRS", watermarkedStream)
>>     val results = tableEnv.sqlQuery( """
>>                                       |SELECT
>>                                       | a
>>                                       | FROM CDRS
>>                                        """.stripMargin)
>>
>>
>>     val result: Table = results
>>
>>     val path = "file:///Users/test/1.txt"
>>     val sink :TableSink[Row]=   new CsvTableSink(
>>       path,                             // output path
>>       fieldDelim = "|",                 // optional: delimit files by '|'
>>       numFiles = 1,                     // optional: write to a single file
>>       writeMode = WriteMode.OVERWRITE)
>>
>>     result.writeToSink(sink)
>>
>>
>>     env.execute("this job")
>>
>>   }
>> }
>>
>>
>>
>>
>> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <ni...@data-artisans.com>
>> wrote:
>>
>>> Hi Karim,
>>> when I was trying to reproduce your code, I got an exception with the
>>> name 'table' being used - by replacing it and completing the job with
>>> some input, I did see the csv file popping up. Also, the job was
>>> crashing when the file 1.txt already existed.
>>>
>>> The code I used (running Flink 1.5-SNAPSHOT):
>>>
>>>   def main(args: Array[String]) {
>>>     // set up the streaming execution environment
>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>
>>>     val stream: DataStream[(Int, Long, String)] =
>>> get3TupleDataStream(env)
>>>       .assignAscendingTimestamps(_._2)
>>>     tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
>>>
>>>     val results = tableEnv.sqlQuery( """
>>>                                        |SELECT
>>>                                        | A,C
>>>                                        | FROM mytable
>>>                                      """.stripMargin)
>>>
>>>     val result: Table = results
>>>
>>>     val path = "file:///tmp/test/1.txt"
>>>     val sink :TableSink[Row]=   new CsvTableSink(
>>>       path,                             // output path
>>>       fieldDelim = "|",                 // optional: delimit files by '|'
>>>       numFiles = 1,                     // optional: write to a single
>>> file
>>>       writeMode = WriteMode.NO_OVERWRITE)
>>>
>>>     result.writeToSink(sink)
>>>
>>>     env.execute("this job")
>>>   }
>>>
>>>   def get3TupleDataStream(env: StreamExecutionEnvironment):
>>> DataStream[(Int, Long, String)] = {
>>>     val data = new mutable.MutableList[(Int, Long, String)]
>>>     data.+=((1, 1L, "Hi"))
>>>     data.+=((2, 2L, "Hello"))
>>>     data.+=((3, 2L, "Hello world"))
>>>     data.+=((4, 3L, "Hello world, how are you?"))
>>>     data.+=((5, 3L, "I am fine."))
>>>     data.+=((6, 3L, "Luke Skywalker"))
>>>     env.fromCollection(data)
>>>   }
>>>
>>>
>>> Nico
>>>
>>> On 16/03/18 22:50, karim amer wrote:
>>> > Hi There,
>>> >
>>> >  I am trying to write a CSVsink to disk but it's not getting written. I
>>> > think the file is getting overwritten or truncated once The Stream
>>> > process finishes. Does anyone know why the file is getting overwritten
>>> > or truncated and how can i fix this ?
>>> >
>>> >
>>> > tableEnv.registerDataStream("table", watermarkedStream)
>>> >
>>> > val results = tableEnv.sqlQuery( """
>>> > |SELECT
>>> > | A
>>> > | FROM table
>>> > """.stripMargin)
>>> >
>>> >
>>> >
>>> > val result: Table = results
>>> >
>>> > val path = "file:///path/test/1.txt"
>>> > val sink :TableSink[Row]=   new CsvTableSink(
>>> >   path, // output path
>>> > fieldDelim = "|", // optional: delimit files by '|'
>>> > numFiles = 1, // optional: write to a single file
>>> > writeMode = WriteMode.NO_OVERWRITE)
>>> >
>>> > result.writeToSink(sink)
>>> >
>>> > env.execute("this job")
>>> >
>>> >
>>> >
>>> >
>>> > Thanks
>>>
>>>
>>
>>
>> --
>> karim amer
>>
>>
>


-- 
karim amer

Re: CsvSink

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Karim,

I cannot find a method invocation "tableEnv.registerDataStream("myTable2",
set, 'A, 'B, 'C )" as shown in the error message in your example.
It would help if you would keep error message and code consistent.
Otherwise it's not possible to figure out what's going on.

Best, Fabian

2018-03-20 0:24 GMT+01:00 karim amer <ka...@gmail.com>:

> Hi Nico,
>
> I tried to reproduce your code but registerDataStream keeps failing to
> register the fields even though i am following your code and the Docs.
> here is the error
> [error]  found   : Symbol
> [error]  required: org.apache.flink.table.expressions.Expression
> [error]     tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
> [error]
> I think my code snippet was misleading. Here is the full snippet Changing
> the name from table didn't fix it for
>
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.flink.core.fs.FileSystem.WriteMode
> import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
> import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.table.api.{Table, TableEnvironment}
> import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
> import org.apache.flink.types.Row
>
>
>
> object datastreamtotableapi {
>
>   case class Calls(a: String,
>                    b: String,
>                    c: String,
>                    d: String,
>                    e: String,
>                    f: String,
>                    g: String,
>                    h: String,
>                    i: String,
>                    j: String,
>                    k: String,
>                    l: String,
>                    m: String,
>                    n: String,
>                    p: String,
>                    q: String,
>                    r: String,
>                    s: String,
>                    t: String,
>                    v: String,
>                    w: String)
>
>
>   def main(args: Array[String]) {
>
>     val params = ParameterTool.fromArgs(args)
>     val input = params.getRequired("input")
>
>
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     env.setParallelism(1)
>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>
>     val dataStream = env.readTextFile(input)
>
>     val namedStream = dataStream.map((value:String) => {
>
>       val columns = value.split(",")
>       Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
>         columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
>         columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
>         columns(18),columns(19), columns(20)
>       )
>     })
>
>
>    val cleanedStream =  namedStream.filter(_.j == " ").filter(_.k==" ")
>
>    val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
>      override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
>    })
>
>
>
>     tableEnv.registerDataStream("CDRS", watermarkedStream)
>     val results = tableEnv.sqlQuery( """
>                                       |SELECT
>                                       | a
>                                       | FROM CDRS
>                                        """.stripMargin)
>
>
>     val result: Table = results
>
>     val path = "file:///Users/test/1.txt"
>     val sink :TableSink[Row]=   new CsvTableSink(
>       path,                             // output path
>       fieldDelim = "|",                 // optional: delimit files by '|'
>       numFiles = 1,                     // optional: write to a single file
>       writeMode = WriteMode.OVERWRITE)
>
>     result.writeToSink(sink)
>
>
>     env.execute("this job")
>
>   }
> }
>
>
>
>
> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <ni...@data-artisans.com>
> wrote:
>
>> Hi Karim,
>> when I was trying to reproduce your code, I got an exception with the
>> name 'table' being used - by replacing it and completing the job with
>> some input, I did see the csv file popping up. Also, the job was
>> crashing when the file 1.txt already existed.
>>
>> The code I used (running Flink 1.5-SNAPSHOT):
>>
>>   def main(args: Array[String]) {
>>     // set up the streaming execution environment
>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>>
>>     val stream: DataStream[(Int, Long, String)] = get3TupleDataStream(env)
>>       .assignAscendingTimestamps(_._2)
>>     tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
>>
>>     val results = tableEnv.sqlQuery( """
>>                                        |SELECT
>>                                        | A,C
>>                                        | FROM mytable
>>                                      """.stripMargin)
>>
>>     val result: Table = results
>>
>>     val path = "file:///tmp/test/1.txt"
>>     val sink :TableSink[Row]=   new CsvTableSink(
>>       path,                             // output path
>>       fieldDelim = "|",                 // optional: delimit files by '|'
>>       numFiles = 1,                     // optional: write to a single
>> file
>>       writeMode = WriteMode.NO_OVERWRITE)
>>
>>     result.writeToSink(sink)
>>
>>     env.execute("this job")
>>   }
>>
>>   def get3TupleDataStream(env: StreamExecutionEnvironment):
>> DataStream[(Int, Long, String)] = {
>>     val data = new mutable.MutableList[(Int, Long, String)]
>>     data.+=((1, 1L, "Hi"))
>>     data.+=((2, 2L, "Hello"))
>>     data.+=((3, 2L, "Hello world"))
>>     data.+=((4, 3L, "Hello world, how are you?"))
>>     data.+=((5, 3L, "I am fine."))
>>     data.+=((6, 3L, "Luke Skywalker"))
>>     env.fromCollection(data)
>>   }
>>
>>
>> Nico
>>
>> On 16/03/18 22:50, karim amer wrote:
>> > Hi There,
>> >
>> >  I am trying to write a CSVsink to disk but it's not getting written. I
>> > think the file is getting overwritten or truncated once The Stream
>> > process finishes. Does anyone know why the file is getting overwritten
>> > or truncated and how can i fix this ?
>> >
>> >
>> > tableEnv.registerDataStream("table", watermarkedStream)
>> >
>> > val results = tableEnv.sqlQuery( """
>> > |SELECT
>> > | A
>> > | FROM table
>> > """.stripMargin)
>> >
>> >
>> >
>> > val result: Table = results
>> >
>> > val path = "file:///path/test/1.txt"
>> > val sink :TableSink[Row]=   new CsvTableSink(
>> >   path, // output path
>> > fieldDelim = "|", // optional: delimit files by '|'
>> > numFiles = 1, // optional: write to a single file
>> > writeMode = WriteMode.NO_OVERWRITE)
>> >
>> > result.writeToSink(sink)
>> >
>> > env.execute("this job")
>> >
>> >
>> >
>> >
>> > Thanks
>>
>>
>
>
> --
> karim amer
>
>

Re: CsvSink

Posted by karim amer <ka...@gmail.com>.
Hi Nico,

I tried to reproduce your code but registerDataStream keeps failing to
register the fields even though i am following your code and the Docs.
here is the error
[error]  found   : Symbol
[error]  required: org.apache.flink.table.expressions.Expression
[error]     tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
[error]
I think my code snippet was misleading. Here is the full snippet Changing
the name from table didn't fix it for

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
import org.apache.flink.types.Row



object datastreamtotableapi {

  case class Calls(a: String,
                   b: String,
                   c: String,
                   d: String,
                   e: String,
                   f: String,
                   g: String,
                   h: String,
                   i: String,
                   j: String,
                   k: String,
                   l: String,
                   m: String,
                   n: String,
                   p: String,
                   q: String,
                   r: String,
                   s: String,
                   t: String,
                   v: String,
                   w: String)


  def main(args: Array[String]) {

    val params = ParameterTool.fromArgs(args)
    val input = params.getRequired("input")


    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val dataStream = env.readTextFile(input)

    val namedStream = dataStream.map((value:String) => {

      val columns = value.split(",")
      Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
        columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
        columns(12), columns(13),columns(14),columns(15), columns(16),
columns(17),
        columns(18),columns(19), columns(20)
      )
    })


   val cleanedStream =  namedStream.filter(_.j == " ").filter(_.k==" ")

   val watermarkedStream =
cleanedStream.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
     override def extractTimestamp(element: Calls): Long =
(element.j.concat(element.k)).toLong
   })



    tableEnv.registerDataStream("CDRS", watermarkedStream)
    val results = tableEnv.sqlQuery( """
                                      |SELECT
                                      | a
                                      | FROM CDRS
                                       """.stripMargin)


    val result: Table = results

    val path = "file:///Users/test/1.txt"
    val sink :TableSink[Row]=   new CsvTableSink(
      path,                             // output path
      fieldDelim = "|",                 // optional: delimit files by '|'
      numFiles = 1,                     // optional: write to a single file
      writeMode = WriteMode.OVERWRITE)

    result.writeToSink(sink)


    env.execute("this job")

  }
}




On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <ni...@data-artisans.com> wrote:

> Hi Karim,
> when I was trying to reproduce your code, I got an exception with the
> name 'table' being used - by replacing it and completing the job with
> some input, I did see the csv file popping up. Also, the job was
> crashing when the file 1.txt already existed.
>
> The code I used (running Flink 1.5-SNAPSHOT):
>
>   def main(args: Array[String]) {
>     // set up the streaming execution environment
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>
>     val stream: DataStream[(Int, Long, String)] = get3TupleDataStream(env)
>       .assignAscendingTimestamps(_._2)
>     tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
>
>     val results = tableEnv.sqlQuery( """
>                                        |SELECT
>                                        | A,C
>                                        | FROM mytable
>                                      """.stripMargin)
>
>     val result: Table = results
>
>     val path = "file:///tmp/test/1.txt"
>     val sink :TableSink[Row]=   new CsvTableSink(
>       path,                             // output path
>       fieldDelim = "|",                 // optional: delimit files by '|'
>       numFiles = 1,                     // optional: write to a single file
>       writeMode = WriteMode.NO_OVERWRITE)
>
>     result.writeToSink(sink)
>
>     env.execute("this job")
>   }
>
>   def get3TupleDataStream(env: StreamExecutionEnvironment):
> DataStream[(Int, Long, String)] = {
>     val data = new mutable.MutableList[(Int, Long, String)]
>     data.+=((1, 1L, "Hi"))
>     data.+=((2, 2L, "Hello"))
>     data.+=((3, 2L, "Hello world"))
>     data.+=((4, 3L, "Hello world, how are you?"))
>     data.+=((5, 3L, "I am fine."))
>     data.+=((6, 3L, "Luke Skywalker"))
>     env.fromCollection(data)
>   }
>
>
> Nico
>
> On 16/03/18 22:50, karim amer wrote:
> > Hi There,
> >
> >  I am trying to write a CSVsink to disk but it's not getting written. I
> > think the file is getting overwritten or truncated once The Stream
> > process finishes. Does anyone know why the file is getting overwritten
> > or truncated and how can i fix this ?
> >
> >
> > tableEnv.registerDataStream("table", watermarkedStream)
> >
> > val results = tableEnv.sqlQuery( """
> > |SELECT
> > | A
> > | FROM table
> > """.stripMargin)
> >
> >
> >
> > val result: Table = results
> >
> > val path = "file:///path/test/1.txt"
> > val sink :TableSink[Row]=   new CsvTableSink(
> >   path, // output path
> > fieldDelim = "|", // optional: delimit files by '|'
> > numFiles = 1, // optional: write to a single file
> > writeMode = WriteMode.NO_OVERWRITE)
> >
> > result.writeToSink(sink)
> >
> > env.execute("this job")
> >
> >
> >
> >
> > Thanks
>
>


-- 
karim amer

Re: CsvSink

Posted by Nico Kruber <ni...@data-artisans.com>.
Hi Karim,
when I was trying to reproduce your code, I got an exception with the
name 'table' being used - by replacing it and completing the job with
some input, I did see the csv file popping up. Also, the job was
crashing when the file 1.txt already existed.

The code I used (running Flink 1.5-SNAPSHOT):

  def main(args: Array[String]) {
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val stream: DataStream[(Int, Long, String)] = get3TupleDataStream(env)
      .assignAscendingTimestamps(_._2)
    tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)

    val results = tableEnv.sqlQuery( """
                                       |SELECT
                                       | A,C
                                       | FROM mytable
                                     """.stripMargin)

    val result: Table = results

    val path = "file:///tmp/test/1.txt"
    val sink :TableSink[Row]=   new CsvTableSink(
      path,                             // output path
      fieldDelim = "|",                 // optional: delimit files by '|'
      numFiles = 1,                     // optional: write to a single file
      writeMode = WriteMode.NO_OVERWRITE)

    result.writeToSink(sink)

    env.execute("this job")
  }

  def get3TupleDataStream(env: StreamExecutionEnvironment):
DataStream[(Int, Long, String)] = {
    val data = new mutable.MutableList[(Int, Long, String)]
    data.+=((1, 1L, "Hi"))
    data.+=((2, 2L, "Hello"))
    data.+=((3, 2L, "Hello world"))
    data.+=((4, 3L, "Hello world, how are you?"))
    data.+=((5, 3L, "I am fine."))
    data.+=((6, 3L, "Luke Skywalker"))
    env.fromCollection(data)
  }


Nico

On 16/03/18 22:50, karim amer wrote:
> Hi There,
> 
>  I am trying to write a CSVsink to disk but it's not getting written. I
> think the file is getting overwritten or truncated once The Stream
> process finishes. Does anyone know why the file is getting overwritten
> or truncated and how can i fix this ?
> 
> 
> tableEnv.registerDataStream("table", watermarkedStream)
> 
> val results = tableEnv.sqlQuery( """
> |SELECT
> | A
> | FROM table
> """.stripMargin)
> 
> 
> 
> val result: Table = results
> 
> val path = "file:///path/test/1.txt"
> val sink :TableSink[Row]=   new CsvTableSink(
>   path, // output path
> fieldDelim = "|", // optional: delimit files by '|'
> numFiles = 1, // optional: write to a single file
> writeMode = WriteMode.NO_OVERWRITE)
> 
> result.writeToSink(sink)
> 
> env.execute("this job")
> 
> 
> 
> 
> Thanks