You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by sri hari kali charan Tummala <ka...@gmail.com> on 2019/07/15 19:32:33 UTC

Fwd: Stream to CSV Sink with SQL Distinct Values

Hi All,

I am trying to read data from kinesis stream and applying SQL
transformation (distinct) and then tryting to write to CSV sink which is
failinf due to this issue (org.apache.flink.table.api.TableException:
AppendStreamTableSink requires that Table has only insert changes.) , full
code is here (
https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112
).

can anyone help me moveforward on this issue?

Full Code:-

// set up the streaming execution environment
val env = StreamExecutionEnvironment.createLocalEnvironment
//env.enableCheckpointing(10)

val tEnv = TableEnvironment.getTableEnvironment(env)

// Get AWS credentials
val credentialsProvider = new DefaultAWSCredentialsProviderChain
val credentials = credentialsProvider.getCredentials

// Configure Flink Kinesis consumer
val consumerConfig = new Properties
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID,
credentials.getAWSAccessKeyId)
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
credentials.getAWSSecretKey)
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"TRIM_HORIZON")

// Create Kinesis stream
val kinesis = env.addSource(new
FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(),
consumerConfig))

val mapFunction: MapFunction[String, Tuple10[String, String,
String,String,String,String,String,String,String,String]] =
  new MapFunction[String, Tuple10[String, String,
String,String,String,String,String,String,String,String]]() {

    override def map(s: String): Tuple10[String, String,
String,String,String,String,String,String,String,String] = {
      val data = new Gson().fromJson(s, classOf[TransactionJsonClass])

      val csvData = data.getCc_num+","+
        data.getFirst+","+
        data.getLast+","+
        data.getTrans_num+","+
        data.getTrans_time+","+
        data.getCategory+","+
        data.getMerchant+","+
        data.getAmt+","+
        data.getMerch_lat+","+
        data.getMerch_long

      //println(csvData)

      val p:Array[String] = csvData.split(",")
      var cc_num:String = p(0)
      var first:String = p(1)
      var last:String = p(2)
      var trans_num:String = p(3)
      var trans_time:String = p(4)
      var category:String = p(5)
      var merchant:String = p(6)
      var amt:String = p(7)
      var merch_lat:String = p(8)
      var merch_long:String = p(9)

      val creationDate: Time = new Time(System.currentTimeMillis())
      return new Tuple10(cc_num, first,
last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long)
    }
  }

val data = kinesis.map(mapFunction)

//data.print()

tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")

val query = "SELECT distinct
cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)

val table1 = table.distinct()

tEnv.registerTable("fromAnotherTable",table1)

table.printSchema()

val csvSink:TableSink[Row]  = new
CsvTableSink("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOutput","~");
val fieldNames:Array[String]              =
Array("cc_num","first_column","last_column","trans_num","trans_time","category_column","merchant_column","amt_column","merch_lat","merch_long")
val fieldTypes:Array[TypeInformation[_]]  = Array(
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING
)

tEnv.registerTableSink("s3csvTargetTransaction", fieldNames,
fieldTypes, csvSink)

tEnv.sqlUpdate("INSERT INTO s3csvTargetTransaction SELECT
cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
from fromAnotherTable")


-- 
Thanks & Regards
Sri Tummala

Re: Stream to CSV Sink with SQL Distinct Values

Posted by sri hari kali charan Tummala <ka...@gmail.com>.
Hi Weng,

another issue now (Exception in thread "main"
org.apache.flink.table.api.TableException: Only tables that originate from
Scala DataStreams can be converted to Scala DataStreams.), here is the full
code
https://github.com/kali786516/FlinkStreamAndSql/blob/15e5e60d6c044bc830f5ef2d79c071389e7460d1/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L128
and pom https://github.com/kali786516/FlinkStreamAndSql/blob/master/pom.xml.

Exception in thread "main" org.apache.flink.table.api.TableException: Only
tables that originate from Scala DataStreams can be converted to Scala
DataStreams.
at
org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:100)
at
com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer$.main(KinesisConsumer.scala:126)
at
com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer.main(KinesisConsumer.scala)

tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")

val query = "SELECT distinct
cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)

table.printSchema()

import org.apache.flink.streaming.api.scala._

val test1 = tEnv.sqlQuery(query).distinct().toAppendStream[Row]

test1.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut3")


On Mon, Jul 15, 2019 at 9:52 PM Caizhi Weng <ts...@gmail.com> wrote:

> Hi Kali,
>
> Currently Flink treats all aggregate functions as retractable. As
> `distinct` is an aggregate function, it's considered by the planner that it
> might update or retract records (although from my perspective it won't...).
> Because csv table sink is an append only sink (it's hard to update what has
> been written in the middle of a file), the exception you mentioned occurs.
>
> However, you can use `toAppendStream` method to change the retractable
> stream to an append only stream. For example,
> `tEnv.sqlQuery(query).distinct().toAppendStream[Row]` and then you can get
> an append only stream. You can then add csv sink to this stream.
>
> sri hari kali charan Tummala <ka...@gmail.com> 于2019年7月16日周二
> 上午3:32写道:
>
>> Hi All,
>>
>> I am trying to read data from kinesis stream and applying SQL
>> transformation (distinct) and then tryting to write to CSV sink which is
>> failinf due to this issue (org.apache.flink.table.api.TableException:
>> AppendStreamTableSink requires that Table has only insert changes.) , full
>> code is here (
>> https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112
>> ).
>>
>> can anyone help me moveforward on this issue?
>>
>> Full Code:-
>>
>> // set up the streaming execution environment
>> val env = StreamExecutionEnvironment.createLocalEnvironment
>> //env.enableCheckpointing(10)
>>
>> val tEnv = TableEnvironment.getTableEnvironment(env)
>>
>> // Get AWS credentials
>> val credentialsProvider = new DefaultAWSCredentialsProviderChain
>> val credentials = credentialsProvider.getCredentials
>>
>> // Configure Flink Kinesis consumer
>> val consumerConfig = new Properties
>> consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
>> consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, credentials.getAWSAccessKeyId)
>> consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, credentials.getAWSSecretKey)
>> consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON")
>>
>> // Create Kinesis stream
>> val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(), consumerConfig))
>>
>> val mapFunction: MapFunction[String, Tuple10[String, String, String,String,String,String,String,String,String,String]] =
>>   new MapFunction[String, Tuple10[String, String, String,String,String,String,String,String,String,String]]() {
>>
>>     override def map(s: String): Tuple10[String, String, String,String,String,String,String,String,String,String] = {
>>       val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
>>
>>       val csvData = data.getCc_num+","+
>>         data.getFirst+","+
>>         data.getLast+","+
>>         data.getTrans_num+","+
>>         data.getTrans_time+","+
>>         data.getCategory+","+
>>         data.getMerchant+","+
>>         data.getAmt+","+
>>         data.getMerch_lat+","+
>>         data.getMerch_long
>>
>>       //println(csvData)
>>
>>       val p:Array[String] = csvData.split(",")
>>       var cc_num:String = p(0)
>>       var first:String = p(1)
>>       var last:String = p(2)
>>       var trans_num:String = p(3)
>>       var trans_time:String = p(4)
>>       var category:String = p(5)
>>       var merchant:String = p(6)
>>       var amt:String = p(7)
>>       var merch_lat:String = p(8)
>>       var merch_long:String = p(9)
>>
>>       val creationDate: Time = new Time(System.currentTimeMillis())
>>       return new Tuple10(cc_num, first, last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long)
>>     }
>>   }
>>
>> val data = kinesis.map(mapFunction)
>>
>> //data.print()
>>
>> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
>>
>> val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')"
>> val table = tEnv.sqlQuery(query)
>>
>> val table1 = table.distinct()
>>
>> tEnv.registerTable("fromAnotherTable",table1)
>>
>> table.printSchema()
>>
>> val csvSink:TableSink[Row]  = new CsvTableSink("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOutput","~");
>> val fieldNames:Array[String]              = Array("cc_num","first_column","last_column","trans_num","trans_time","category_column","merchant_column","amt_column","merch_lat","merch_long")
>> val fieldTypes:Array[TypeInformation[_]]  = Array(
>>   org.apache.flink.api.common.typeinfo.Types.STRING,
>>   org.apache.flink.api.common.typeinfo.Types.STRING,
>>   org.apache.flink.api.common.typeinfo.Types.STRING,
>>   org.apache.flink.api.common.typeinfo.Types.STRING,
>>   org.apache.flink.api.common.typeinfo.Types.STRING,
>>   org.apache.flink.api.common.typeinfo.Types.STRING,
>>   org.apache.flink.api.common.typeinfo.Types.STRING,
>>   org.apache.flink.api.common.typeinfo.Types.STRING,
>>   org.apache.flink.api.common.typeinfo.Types.STRING,
>>   org.apache.flink.api.common.typeinfo.Types.STRING
>> )
>>
>> tEnv.registerTableSink("s3csvTargetTransaction", fieldNames, fieldTypes, csvSink)
>>
>> tEnv.sqlUpdate("INSERT INTO s3csvTargetTransaction SELECT cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long from fromAnotherTable")
>>
>>
>> --
>> Thanks & Regards
>> Sri Tummala
>>
>>

-- 
Thanks & Regards
Sri Tummala

Re: Stream to CSV Sink with SQL Distinct Values

Posted by sri hari kali charan Tummala <ka...@gmail.com>.
Hi Lee,

I did try

Option 1:-
it writes to CSV file only if I kill the running job.

tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row])
  .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut3",
  FileSystem.WriteMode.OVERWRITE,"~","|")

OutPut:-
2>
(true,180094108369013,John,Holland,c1ad7a1b73172ef67bd24820438f3f93,2019-07-15
22:48:40,travel,Satterfield-Lowe,81,39.015861,-119.883595)

Option 2:-
I tried several options thought this workaround is kind of working but I
need to replace brakcets,true etc....

import java.io.PrintStream
val fileOut = new
PrintStream("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut2/out.txt")

System.setOut(fileOut)

tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]).print()

System.out.println(tEnv.toRetractStream(table,
classOf[org.apache.flink.types.Row]).print())


On Mon, Jul 15, 2019 at 10:03 PM JingsongLee <lz...@aliyun.com>
wrote:

> Hi caizhi and kali:
>
> I think this table should use toRetractStream instead of toAppendStream,
> and you should handle the retract messages. (If you just use distinct, the
> message should always be accumulate message)
>
> Best, JingsongLee
>
> ------------------------------------------------------------------
> From:Caizhi Weng <ts...@gmail.com>
> Send Time:2019年7月16日(星期二) 09:52
> To:sri hari kali charan Tummala <ka...@gmail.com>
> Cc:user <us...@flink.apache.org>
> Subject:Re: Stream to CSV Sink with SQL Distinct Values
>
> Hi Kali,
>
> Currently Flink treats all aggregate functions as retractable. As
> `distinct` is an aggregate function, it's considered by the planner that it
> might update or retract records (although from my perspective it won't...).
> Because csv table sink is an append only sink (it's hard to update what has
> been written in the middle of a file), the exception you mentioned occurs.
>
> However, you can use `toAppendStream` method to change the retractable
> stream to an append only stream. For example,
> `tEnv.sqlQuery(query).distinct().toAppendStream[Row]` and then you can get
> an append only stream. You can then add csv sink to this stream.
>
> sri hari kali charan Tummala <ka...@gmail.com> 于2019年7月16日周二
> 上午3:32写道:
> Hi All,
>
> I am trying to read data from kinesis stream and applying SQL
> transformation (distinct) and then tryting to write to CSV sink which is
> failinf due to this issue (org.apache.flink.table.api.TableException:
> AppendStreamTableSink requires that Table has only insert changes.) , full
> code is here (
> https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112
> ).
>
> can anyone help me moveforward on this issue?
>
> Full Code:-
>
> // set up the streaming execution environment
> val env = StreamExecutionEnvironment.createLocalEnvironment
> //env.enableCheckpointing(10)
>
> val tEnv = TableEnvironment.getTableEnvironment(env)
>
> // Get AWS credentials
> val credentialsProvider = new DefaultAWSCredentialsProviderChain
> val credentials = credentialsProvider.getCredentials
>
> // Configure Flink Kinesis consumer
> val consumerConfig = new Properties
> consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
> consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, credentials.getAWSAccessKeyId)
> consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, credentials.getAWSSecretKey)
> consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON")
>
> // Create Kinesis stream
> val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(), consumerConfig))
>
> val mapFunction: MapFunction[String, Tuple10[String, String, String,String,String,String,String,String,String,String]] =
>   new MapFunction[String, Tuple10[String, String, String,String,String,String,String,String,String,String]]() {
>
>     override def map(s: String): Tuple10[String, String, String,String,String,String,String,String,String,String] = {
>       val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
>
>       val csvData = data.getCc_num+","+
>         data.getFirst+","+
>         data.getLast+","+
>         data.getTrans_num+","+
>         data.getTrans_time+","+
>         data.getCategory+","+
>         data.getMerchant+","+
>         data.getAmt+","+
>         data.getMerch_lat+","+
>         data.getMerch_long
>
>       //println(csvData)
>
>       val p:Array[String] = csvData.split(",")
>       var cc_num:String = p(0)
>       var first:String = p(1)
>       var last:String = p(2)
>       var trans_num:String = p(3)
>       var trans_time:String = p(4)
>       var category:String = p(5)
>       var merchant:String = p(6)
>       var amt:String = p(7)
>       var merch_lat:String = p(8)
>       var merch_long:String = p(9)
>
>       val creationDate: Time = new Time(System.currentTimeMillis())
>       return new Tuple10(cc_num, first, last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long)
>     }
>   }
>
> val data = kinesis.map(mapFunction)
>
> //data.print()
>
> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
>
> val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')"
> val table = tEnv.sqlQuery(query)
>
> val table1 = table.distinct()
>
> tEnv.registerTable("fromAnotherTable",table1)
>
> table.printSchema()
>
> val csvSink:TableSink[Row]  = new CsvTableSink("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOutput","~");
> val fieldNames:Array[String]              = Array("cc_num","first_column","last_column","trans_num","trans_time","category_column","merchant_column","amt_column","merch_lat","merch_long")
> val fieldTypes:Array[TypeInformation[_]]  = Array(
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING
> )
>
> tEnv.registerTableSink("s3csvTargetTransaction", fieldNames, fieldTypes, csvSink)
>
> tEnv.sqlUpdate("INSERT INTO s3csvTargetTransaction SELECT cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long from fromAnotherTable")
>
>
> --
> Thanks & Regards
> Sri Tummala
>
>

-- 
Thanks & Regards
Sri Tummala

Re: Stream to CSV Sink with SQL Distinct Values

Posted by sri hari kali charan Tummala <ka...@gmail.com>.
Hi Lee,

it writes only after the job is killed and also I dont see all the records
? is there a workaround?

tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row])
  .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4",
    FileSystem.WriteMode.NO_OVERWRITE,"~","|")


On Mon, Jul 15, 2019 at 10:03 PM JingsongLee <lz...@aliyun.com>
wrote:

> Hi caizhi and kali:
>
> I think this table should use toRetractStream instead of toAppendStream,
> and you should handle the retract messages. (If you just use distinct, the
> message should always be accumulate message)
>
> Best, JingsongLee
>
> ------------------------------------------------------------------
> From:Caizhi Weng <ts...@gmail.com>
> Send Time:2019年7月16日(星期二) 09:52
> To:sri hari kali charan Tummala <ka...@gmail.com>
> Cc:user <us...@flink.apache.org>
> Subject:Re: Stream to CSV Sink with SQL Distinct Values
>
> Hi Kali,
>
> Currently Flink treats all aggregate functions as retractable. As
> `distinct` is an aggregate function, it's considered by the planner that it
> might update or retract records (although from my perspective it won't...).
> Because csv table sink is an append only sink (it's hard to update what has
> been written in the middle of a file), the exception you mentioned occurs.
>
> However, you can use `toAppendStream` method to change the retractable
> stream to an append only stream. For example,
> `tEnv.sqlQuery(query).distinct().toAppendStream[Row]` and then you can get
> an append only stream. You can then add csv sink to this stream.
>
> sri hari kali charan Tummala <ka...@gmail.com> 于2019年7月16日周二
> 上午3:32写道:
> Hi All,
>
> I am trying to read data from kinesis stream and applying SQL
> transformation (distinct) and then tryting to write to CSV sink which is
> failinf due to this issue (org.apache.flink.table.api.TableException:
> AppendStreamTableSink requires that Table has only insert changes.) , full
> code is here (
> https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112
> ).
>
> can anyone help me moveforward on this issue?
>
> Full Code:-
>
> // set up the streaming execution environment
> val env = StreamExecutionEnvironment.createLocalEnvironment
> //env.enableCheckpointing(10)
>
> val tEnv = TableEnvironment.getTableEnvironment(env)
>
> // Get AWS credentials
> val credentialsProvider = new DefaultAWSCredentialsProviderChain
> val credentials = credentialsProvider.getCredentials
>
> // Configure Flink Kinesis consumer
> val consumerConfig = new Properties
> consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
> consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, credentials.getAWSAccessKeyId)
> consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, credentials.getAWSSecretKey)
> consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON")
>
> // Create Kinesis stream
> val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(), consumerConfig))
>
> val mapFunction: MapFunction[String, Tuple10[String, String, String,String,String,String,String,String,String,String]] =
>   new MapFunction[String, Tuple10[String, String, String,String,String,String,String,String,String,String]]() {
>
>     override def map(s: String): Tuple10[String, String, String,String,String,String,String,String,String,String] = {
>       val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
>
>       val csvData = data.getCc_num+","+
>         data.getFirst+","+
>         data.getLast+","+
>         data.getTrans_num+","+
>         data.getTrans_time+","+
>         data.getCategory+","+
>         data.getMerchant+","+
>         data.getAmt+","+
>         data.getMerch_lat+","+
>         data.getMerch_long
>
>       //println(csvData)
>
>       val p:Array[String] = csvData.split(",")
>       var cc_num:String = p(0)
>       var first:String = p(1)
>       var last:String = p(2)
>       var trans_num:String = p(3)
>       var trans_time:String = p(4)
>       var category:String = p(5)
>       var merchant:String = p(6)
>       var amt:String = p(7)
>       var merch_lat:String = p(8)
>       var merch_long:String = p(9)
>
>       val creationDate: Time = new Time(System.currentTimeMillis())
>       return new Tuple10(cc_num, first, last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long)
>     }
>   }
>
> val data = kinesis.map(mapFunction)
>
> //data.print()
>
> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
>
> val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')"
> val table = tEnv.sqlQuery(query)
>
> val table1 = table.distinct()
>
> tEnv.registerTable("fromAnotherTable",table1)
>
> table.printSchema()
>
> val csvSink:TableSink[Row]  = new CsvTableSink("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOutput","~");
> val fieldNames:Array[String]              = Array("cc_num","first_column","last_column","trans_num","trans_time","category_column","merchant_column","amt_column","merch_lat","merch_long")
> val fieldTypes:Array[TypeInformation[_]]  = Array(
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING
> )
>
> tEnv.registerTableSink("s3csvTargetTransaction", fieldNames, fieldTypes, csvSink)
>
> tEnv.sqlUpdate("INSERT INTO s3csvTargetTransaction SELECT cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long from fromAnotherTable")
>
>
> --
> Thanks & Regards
> Sri Tummala
>
>

-- 
Thanks & Regards
Sri Tummala

Re: Stream to CSV Sink with SQL Distinct Values

Posted by JingsongLee <lz...@aliyun.com>.
Hi caizhi and kali:

I think this table should use toRetractStream instead of toAppendStream, and you should handle the retract messages. (If you just use distinct, the message should always be accumulate message)

Best, JingsongLee


------------------------------------------------------------------
From:Caizhi Weng <ts...@gmail.com>
Send Time:2019年7月16日(星期二) 09:52
To:sri hari kali charan Tummala <ka...@gmail.com>
Cc:user <us...@flink.apache.org>
Subject:Re: Stream to CSV Sink with SQL Distinct Values

Hi Kali,

Currently Flink treats all aggregate functions as retractable. As `distinct` is an aggregate function, it's considered by the planner that it might update or retract records (although from my perspective it won't...). Because csv table sink is an append only sink (it's hard to update what has been written in the middle of a file), the exception you mentioned occurs.

However, you can use `toAppendStream` method to change the retractable stream to an append only stream. For example, `tEnv.sqlQuery(query).distinct().toAppendStream[Row]` and then you can get an append only stream. You can then add csv sink to this stream.
sri hari kali charan Tummala <ka...@gmail.com> 于2019年7月16日周二 上午3:32写道:

Hi All, 

I am trying to read data from kinesis stream and applying SQL transformation (distinct) and then tryting to write to CSV sink which is failinf due to this issue (org.apache.flink.table.api.TableException: AppendStreamTableSink requires that Table has only insert changes.) , full code is here (https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112).

can anyone help me moveforward on this issue?

Full Code:- 
// set up the streaming execution environment
val env = StreamExecutionEnvironment.createLocalEnvironment
//env.enableCheckpointing(10)

val tEnv = TableEnvironment.getTableEnvironment(env)

// Get AWS credentials
val credentialsProvider = new DefaultAWSCredentialsProviderChain
val credentials = credentialsProvider.getCredentials

// Configure Flink Kinesis consumer
val consumerConfig = new Properties
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, credentials.getAWSAccessKeyId)
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, credentials.getAWSSecretKey)
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON")

// Create Kinesis stream
val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(), consumerConfig))

val mapFunction: MapFunction[String, Tuple10[String, String, String,String,String,String,String,String,String,String]] =
  new MapFunction[String, Tuple10[String, String, String,String,String,String,String,String,String,String]]() {

    override def map(s: String): Tuple10[String, String, String,String,String,String,String,String,String,String] = {
      val data = new Gson().fromJson(s, classOf[TransactionJsonClass])

      val csvData = data.getCc_num+","+
        data.getFirst+","+
        data.getLast+","+
        data.getTrans_num+","+
        data.getTrans_time+","+
        data.getCategory+","+
        data.getMerchant+","+
        data.getAmt+","+
        data.getMerch_lat+","+
        data.getMerch_long

      //println(csvData)

      val p:Array[String] = csvData.split(",")
      var cc_num:String = p(0)
      var first:String = p(1)
      var last:String = p(2)
      var trans_num:String = p(3)
      var trans_time:String = p(4)
      var category:String = p(5)
      var merchant:String = p(6)
      var amt:String = p(7)
      var merch_lat:String = p(8)
      var merch_long:String = p(9)

      val creationDate: Time = new Time(System.currentTimeMillis())
      return new Tuple10(cc_num, first, last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long)
    }
  }

val data = kinesis.map(mapFunction)

//data.print()

tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")

val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)

val table1 = table.distinct()

tEnv.registerTable("fromAnotherTable",table1)

table.printSchema()

val csvSink:TableSink[Row]  = new CsvTableSink("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOutput","~");
val fieldNames:Array[String]              = Array("cc_num","first_column","last_column","trans_num","trans_time","category_column","merchant_column","amt_column","merch_lat","merch_long")
val fieldTypes:Array[TypeInformation[_]]  = Array(
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING
)

tEnv.registerTableSink("s3csvTargetTransaction", fieldNames, fieldTypes, csvSink)

tEnv.sqlUpdate("INSERT INTO s3csvTargetTransaction SELECT cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long from fromAnotherTable")

-- 
Thanks & Regards
Sri Tummala


Re: Stream to CSV Sink with SQL Distinct Values

Posted by Caizhi Weng <ts...@gmail.com>.
Hi Kali,

Currently Flink treats all aggregate functions as retractable. As
`distinct` is an aggregate function, it's considered by the planner that it
might update or retract records (although from my perspective it won't...).
Because csv table sink is an append only sink (it's hard to update what has
been written in the middle of a file), the exception you mentioned occurs.

However, you can use `toAppendStream` method to change the retractable
stream to an append only stream. For example,
`tEnv.sqlQuery(query).distinct().toAppendStream[Row]` and then you can get
an append only stream. You can then add csv sink to this stream.

sri hari kali charan Tummala <ka...@gmail.com> 于2019年7月16日周二
上午3:32写道:

> Hi All,
>
> I am trying to read data from kinesis stream and applying SQL
> transformation (distinct) and then tryting to write to CSV sink which is
> failinf due to this issue (org.apache.flink.table.api.TableException:
> AppendStreamTableSink requires that Table has only insert changes.) , full
> code is here (
> https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112
> ).
>
> can anyone help me moveforward on this issue?
>
> Full Code:-
>
> // set up the streaming execution environment
> val env = StreamExecutionEnvironment.createLocalEnvironment
> //env.enableCheckpointing(10)
>
> val tEnv = TableEnvironment.getTableEnvironment(env)
>
> // Get AWS credentials
> val credentialsProvider = new DefaultAWSCredentialsProviderChain
> val credentials = credentialsProvider.getCredentials
>
> // Configure Flink Kinesis consumer
> val consumerConfig = new Properties
> consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
> consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, credentials.getAWSAccessKeyId)
> consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, credentials.getAWSSecretKey)
> consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON")
>
> // Create Kinesis stream
> val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(), consumerConfig))
>
> val mapFunction: MapFunction[String, Tuple10[String, String, String,String,String,String,String,String,String,String]] =
>   new MapFunction[String, Tuple10[String, String, String,String,String,String,String,String,String,String]]() {
>
>     override def map(s: String): Tuple10[String, String, String,String,String,String,String,String,String,String] = {
>       val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
>
>       val csvData = data.getCc_num+","+
>         data.getFirst+","+
>         data.getLast+","+
>         data.getTrans_num+","+
>         data.getTrans_time+","+
>         data.getCategory+","+
>         data.getMerchant+","+
>         data.getAmt+","+
>         data.getMerch_lat+","+
>         data.getMerch_long
>
>       //println(csvData)
>
>       val p:Array[String] = csvData.split(",")
>       var cc_num:String = p(0)
>       var first:String = p(1)
>       var last:String = p(2)
>       var trans_num:String = p(3)
>       var trans_time:String = p(4)
>       var category:String = p(5)
>       var merchant:String = p(6)
>       var amt:String = p(7)
>       var merch_lat:String = p(8)
>       var merch_long:String = p(9)
>
>       val creationDate: Time = new Time(System.currentTimeMillis())
>       return new Tuple10(cc_num, first, last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long)
>     }
>   }
>
> val data = kinesis.map(mapFunction)
>
> //data.print()
>
> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
>
> val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')"
> val table = tEnv.sqlQuery(query)
>
> val table1 = table.distinct()
>
> tEnv.registerTable("fromAnotherTable",table1)
>
> table.printSchema()
>
> val csvSink:TableSink[Row]  = new CsvTableSink("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOutput","~");
> val fieldNames:Array[String]              = Array("cc_num","first_column","last_column","trans_num","trans_time","category_column","merchant_column","amt_column","merch_lat","merch_long")
> val fieldTypes:Array[TypeInformation[_]]  = Array(
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING
> )
>
> tEnv.registerTableSink("s3csvTargetTransaction", fieldNames, fieldTypes, csvSink)
>
> tEnv.sqlUpdate("INSERT INTO s3csvTargetTransaction SELECT cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long from fromAnotherTable")
>
>
> --
> Thanks & Regards
> Sri Tummala
>
>