You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Mich Talebzadeh <mi...@gmail.com> on 2018/08/07 22:02:32 UTC

Re: Working out through individual messages in Flink

Hi Fabian,

Reading your notes above I have converted the table back to DataStream.

    val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
    tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker,
'timeissued, 'price)

           val key =
tableEnv.scan("priceTable").select('key).toDataStream[Row]
           val ticker =
tableEnv.scan("priceTable").select('ticker).toDataStream[Row]
           val timeissued =
tableEnv.scan("priceTable").select('timeissued).toDataStream[Row]
           val price =
tableEnv.scan("priceTable").select('price).toDataStream[Row]

My intension is to create an Hbase sink as follows:

            // Save prices to Hbase table
             var p = new Put(new String(key).getBytes())
             p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(),
new String(ticker).getBytes())
             p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(),     new
String(timeissued).getBytes())
             p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(),
new String(priceToString).getBytes())
             p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(),
new String(CURRENCY).getBytes())
             p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(),
new String(1.toString).getBytes())
             p.add("OPERATION".getBytes(), "OP_TIME".getBytes(),
new String(System.currentTimeMillis.toString).getBytes())
             HbaseTable.put(p)
             HbaseTable.flushCommits()

However, I don't seem to be able to get the correct values for the columns!

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 30 Jul 2018 at 09:58, Fabian Hueske <fh...@gmail.com> wrote:

> A *Table*Source [1], is a special input connector for Flink's relational
> APIs (Table API and SQL) [2].
> You can transform and filter with these APIs as well (it's probably even
> easier). In SQL this would be the SELECT and WHERE clauses of a query.
>
> However, there is no *Table*Sink for HBase and you would need to convert
> the Table back to a DataStream [3].
> That's not very difficult since the APIs are integrated with each other.
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
>
> 2018-07-30 10:47 GMT+02:00 Mich Talebzadeh <mi...@gmail.com>:
>
>> Thanks Fabian. That was very useful.
>>
>> How about an operation like below?
>>
>>          // create builder
>>          val KafkaTableSource = Kafka011JsonTableSource.builder()
>>          // set Kafka topic
>>             .forTopic(topicsValue)
>>          // set Kafka consumer properties
>>             .withKafkaProperties(properties)
>>          // set Table schema
>>         .withSchema(TableSchema.builder()
>>         .field("key", Types.STRING)
>>         .field("ticker", Types.STRING)
>>         .field("timeissued", Types.STRING)
>>         .field("price", Types.FLOAT)
>>         .build())
>>
>> Will that be OK?
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 30 Jul 2018 at 09:19, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Flink processes streams record by record, instead of micro-batching
>>> records together. Since every record comes by itself, there is no for-each.
>>> Simple record-by-record transformations can be done with a MapFunction,
>>> filtering out records with a FilterFunction. You can also implement a
>>> FlatMapFunction to do both in one step.
>>>
>>> Once the stream is transformed and filtered, you can write it to HBase
>>> with a sink function.
>>>
>>>
>>> 2018-07-30 10:03 GMT+02:00 Mich Talebzadeh <mi...@gmail.com>:
>>>
>>>> Just to clarify these are the individual prices separated by ','. The
>>>> below shows three price lines in the topic
>>>>
>>>> UUID,                            Security,         Time,        Price
>>>> 1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88
>>>> 8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94
>>>> 81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33
>>>>
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh <
>>>> mich.talebzadeh@gmail.com> wrote:
>>>>
>>>>>
>>>>> Hi,
>>>>>
>>>>> I have a Kafka topic that transmits 100 security prices ever 2
>>>>> seconds.
>>>>>
>>>>> In Spark streaming I go through the RDD and walk through rows one by
>>>>> one and check prices
>>>>> In prices are valuable I store them into an Hbase table
>>>>>
>>>>>     val dstream = KafkaUtils.createDirectStream[String, String,
>>>>> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
>>>>>     dstream.cache()
>>>>>     dstream.foreachRDD
>>>>>     { pricesRDD =>
>>>>>           // Work on individual messages
>>>>>       *   for(line <- pricesRDD.collect.toArray)*
>>>>>          {
>>>>>            var key = line._2.split(',').view(0).toString
>>>>>            var ticker =  line._2.split(',').view(1).toString
>>>>>            var timeissued = line._2.split(',').view(2).toString
>>>>>            var price = line._2.split(',').view(3).toFloat
>>>>>            val priceToString = line._2.split(',').view(3)
>>>>>             if (price > 90.0)
>>>>>            {
>>>>>                //save to Hbase table
>>>>>            }
>>>>>           }
>>>>>      }
>>>>>
>>>>> That works fine.
>>>>>
>>>>> In Flink I define my source as below
>>>>>
>>>>>     val streamExecEnv =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>>>
>>>>> streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>>     val stream = streamExecEnv
>>>>>        .addSource(new FlinkKafkaConsumer011[String](topicsValue, new
>>>>> SimpleStringSchema(), properties))
>>>>>
>>>>> Is there anyway I can perform similar operation in Flink? I need to go
>>>>> through every topic load sent and look at the individual rows/ For example
>>>>> what is the equivalent of
>>>>>
>>>>> *for(line <- pricesRDD.collect.toArray)*
>>>>> In flink?
>>>>>
>>>>> Thanks
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>
>>>
>

Re: Working out through individual messages in Flink

Posted by Jörn Franke <jo...@gmail.com>.
(At the end of your code)

> On 8. Aug 2018, at 00:29, Jörn Franke <jo...@gmail.com> wrote:
> 
> Hi Mich,
> 
> Would it be possible to share the full source code ?
> I am missing a call to streamExecEnvironment.execute
> 
> Best regards 
> 
>> On 8. Aug 2018, at 00:02, Mich Talebzadeh <mi...@gmail.com> wrote:
>> 
>> Hi Fabian,
>> 
>> Reading your notes above I have converted the table back to DataStream.
>> 
>>     val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>>     tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker, 'timeissued, 'price)
>> 
>>            val key = tableEnv.scan("priceTable").select('key).toDataStream[Row]
>>            val ticker = tableEnv.scan("priceTable").select('ticker).toDataStream[Row]
>>            val timeissued = tableEnv.scan("priceTable").select('timeissued).toDataStream[Row]
>>            val price = tableEnv.scan("priceTable").select('price).toDataStream[Row]
>> 
>> My intension is to create an Hbase sink as follows:
>> 
>>             // Save prices to Hbase table
>>              var p = new Put(new String(key).getBytes())
>>              p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(),          new String(ticker).getBytes())
>>              p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(),     new String(timeissued).getBytes())
>>              p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(),           new String(priceToString).getBytes())
>>              p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(),         new String(CURRENCY).getBytes())
>>              p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(),         new String(1.toString).getBytes())
>>              p.add("OPERATION".getBytes(), "OP_TIME".getBytes(),         new String(System.currentTimeMillis.toString).getBytes())
>>              HbaseTable.put(p)
>>              HbaseTable.flushCommits()
>> 
>> However, I don't seem to be able to get the correct values for the columns!
>> 
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> http://talebzadehmich.wordpress.com
>> 
>> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>>  
>> 
>> 
>>> On Mon, 30 Jul 2018 at 09:58, Fabian Hueske <fh...@gmail.com> wrote:
>>> A *Table*Source [1], is a special input connector for Flink's relational APIs (Table API and SQL) [2].
>>> You can transform and filter with these APIs as well (it's probably even easier). In SQL this would be the SELECT and WHERE clauses of a query.
>>> 
>>> However, there is no *Table*Sink for HBase and you would need to convert the Table back to a DataStream [3].
>>> That's not very difficult since the APIs are integrated with each other.
>>> 
>>> Best, Fabian
>>> 
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html
>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html
>>> [3] https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
>>> 
>>> 2018-07-30 10:47 GMT+02:00 Mich Talebzadeh <mi...@gmail.com>:
>>>> Thanks Fabian. That was very useful.
>>>> 
>>>> How about an operation like below?
>>>> 
>>>>          // create builder
>>>>          val KafkaTableSource = Kafka011JsonTableSource.builder()
>>>>          // set Kafka topic
>>>>             .forTopic(topicsValue)
>>>>          // set Kafka consumer properties
>>>>             .withKafkaProperties(properties)
>>>>          // set Table schema
>>>>         .withSchema(TableSchema.builder()
>>>>         .field("key", Types.STRING)
>>>>         .field("ticker", Types.STRING)
>>>>         .field("timeissued", Types.STRING)
>>>>         .field("price", Types.FLOAT)
>>>>         .build())
>>>> 
>>>> Will that be OK? 
>>>> 
>>>> 
>>>> Dr Mich Talebzadeh
>>>>  
>>>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>  
>>>> http://talebzadehmich.wordpress.com
>>>> 
>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>>>>  
>>>> 
>>>> 
>>>>> On Mon, 30 Jul 2018 at 09:19, Fabian Hueske <fh...@gmail.com> wrote:
>>>>> Hi,
>>>>> 
>>>>> Flink processes streams record by record, instead of micro-batching records together. Since every record comes by itself, there is no for-each.
>>>>> Simple record-by-record transformations can be done with a MapFunction, filtering out records with a FilterFunction. You can also implement a FlatMapFunction to do both in one step.
>>>>> 
>>>>> Once the stream is transformed and filtered, you can write it to HBase with a sink function.
>>>>> 
>>>>> 
>>>>> 2018-07-30 10:03 GMT+02:00 Mich Talebzadeh <mi...@gmail.com>:
>>>>>> Just to clarify these are the individual prices separated by ','. The below shows three price lines in the topic
>>>>>> 
>>>>>> UUID,                            Security,         Time,        Price
>>>>>> 1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88
>>>>>> 8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94
>>>>>> 81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33
>>>>>> 
>>>>>> 
>>>>>> Dr Mich Talebzadeh
>>>>>>  
>>>>>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>  
>>>>>> http://talebzadehmich.wordpress.com
>>>>>> 
>>>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>>>>>>  
>>>>>> 
>>>>>> 
>>>>>>> On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh <mi...@gmail.com> wrote:
>>>>>>> 
>>>>>>> Hi,
>>>>>>> 
>>>>>>> I have a Kafka topic that transmits 100 security prices ever 2 seconds. 
>>>>>>> 
>>>>>>> In Spark streaming I go through the RDD and walk through rows one by one and check prices
>>>>>>> In prices are valuable I store them into an Hbase table
>>>>>>> 
>>>>>>>     val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
>>>>>>>     dstream.cache()
>>>>>>>     dstream.foreachRDD
>>>>>>>     { pricesRDD =>
>>>>>>>           // Work on individual messages
>>>>>>>          for(line <- pricesRDD.collect.toArray)
>>>>>>>          {
>>>>>>>            var key = line._2.split(',').view(0).toString
>>>>>>>            var ticker =  line._2.split(',').view(1).toString
>>>>>>>            var timeissued = line._2.split(',').view(2).toString
>>>>>>>            var price = line._2.split(',').view(3).toFloat
>>>>>>>            val priceToString = line._2.split(',').view(3)
>>>>>>>             if (price > 90.0)
>>>>>>>            {
>>>>>>>                //save to Hbase table
>>>>>>>            }
>>>>>>>           }
>>>>>>>      }
>>>>>>> 
>>>>>>> That works fine. 
>>>>>>> 
>>>>>>> In Flink I define my source as below
>>>>>>> 
>>>>>>>     val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>     streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>>>>     val stream = streamExecEnv
>>>>>>>        .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties))
>>>>>>> 
>>>>>>> Is there anyway I can perform similar operation in Flink? I need to go through every topic load sent and look at the individual rows/ For example what is the equivalent of 
>>>>>>> 
>>>>>>> for(line <- pricesRDD.collect.toArray)
>>>>>>> In flink?
>>>>>>> 
>>>>>>> Thanks
>>>>>>> Dr Mich Talebzadeh
>>>>>>>  
>>>>>>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>  
>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>> 
>>>>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>>>>>>>  
>>>>> 
>>> 

Re: Working out through individual messages in Flink

Posted by Mich Talebzadeh <mi...@gmail.com>.
Hi Jorn,

Thanks I uploaded the Scala code to my GitHub  --> md_streaming.scala

https://github.com/michTalebzadeh/Flink

Regards,

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 7 Aug 2018 at 23:29, Jörn Franke <jo...@gmail.com> wrote:

> Hi Mich,
>
> Would it be possible to share the full source code ?
> I am missing a call to streamExecEnvironment.execute
>
> Best regards
>
> On 8. Aug 2018, at 00:02, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
> Hi Fabian,
>
> Reading your notes above I have converted the table back to DataStream.
>
>     val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>     tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker,
> 'timeissued, 'price)
>
>            val key =
> tableEnv.scan("priceTable").select('key).toDataStream[Row]
>            val ticker =
> tableEnv.scan("priceTable").select('ticker).toDataStream[Row]
>            val timeissued =
> tableEnv.scan("priceTable").select('timeissued).toDataStream[Row]
>            val price =
> tableEnv.scan("priceTable").select('price).toDataStream[Row]
>
> My intension is to create an Hbase sink as follows:
>
>             // Save prices to Hbase table
>              var p = new Put(new String(key).getBytes())
>              p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(),
> new String(ticker).getBytes())
>              p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(),     new
> String(timeissued).getBytes())
>              p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(),
> new String(priceToString).getBytes())
>              p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(),
> new String(CURRENCY).getBytes())
>              p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(),
> new String(1.toString).getBytes())
>              p.add("OPERATION".getBytes(), "OP_TIME".getBytes(),
> new String(System.currentTimeMillis.toString).getBytes())
>              HbaseTable.put(p)
>              HbaseTable.flushCommits()
>
> However, I don't seem to be able to get the correct values for the columns!
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 30 Jul 2018 at 09:58, Fabian Hueske <fh...@gmail.com> wrote:
>
>> A *Table*Source [1], is a special input connector for Flink's relational
>> APIs (Table API and SQL) [2].
>> You can transform and filter with these APIs as well (it's probably even
>> easier). In SQL this would be the SELECT and WHERE clauses of a query.
>>
>> However, there is no *Table*Sink for HBase and you would need to convert
>> the Table back to a DataStream [3].
>> That's not very difficult since the APIs are integrated with each other.
>>
>> Best, Fabian
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
>>
>> 2018-07-30 10:47 GMT+02:00 Mich Talebzadeh <mi...@gmail.com>:
>>
>>> Thanks Fabian. That was very useful.
>>>
>>> How about an operation like below?
>>>
>>>          // create builder
>>>          val KafkaTableSource = Kafka011JsonTableSource.builder()
>>>          // set Kafka topic
>>>             .forTopic(topicsValue)
>>>          // set Kafka consumer properties
>>>             .withKafkaProperties(properties)
>>>          // set Table schema
>>>         .withSchema(TableSchema.builder()
>>>         .field("key", Types.STRING)
>>>         .field("ticker", Types.STRING)
>>>         .field("timeissued", Types.STRING)
>>>         .field("price", Types.FLOAT)
>>>         .build())
>>>
>>> Will that be OK?
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Mon, 30 Jul 2018 at 09:19, Fabian Hueske <fh...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Flink processes streams record by record, instead of micro-batching
>>>> records together. Since every record comes by itself, there is no for-each.
>>>> Simple record-by-record transformations can be done with a MapFunction,
>>>> filtering out records with a FilterFunction. You can also implement a
>>>> FlatMapFunction to do both in one step.
>>>>
>>>> Once the stream is transformed and filtered, you can write it to HBase
>>>> with a sink function.
>>>>
>>>>
>>>> 2018-07-30 10:03 GMT+02:00 Mich Talebzadeh <mi...@gmail.com>:
>>>>
>>>>> Just to clarify these are the individual prices separated by ','. The
>>>>> below shows three price lines in the topic
>>>>>
>>>>> UUID,                            Security,         Time,        Price
>>>>> 1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88
>>>>> 8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94
>>>>> 81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33
>>>>>
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh <
>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have a Kafka topic that transmits 100 security prices ever 2
>>>>>> seconds.
>>>>>>
>>>>>> In Spark streaming I go through the RDD and walk through rows one by
>>>>>> one and check prices
>>>>>> In prices are valuable I store them into an Hbase table
>>>>>>
>>>>>>     val dstream = KafkaUtils.createDirectStream[String, String,
>>>>>> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
>>>>>>     dstream.cache()
>>>>>>     dstream.foreachRDD
>>>>>>     { pricesRDD =>
>>>>>>           // Work on individual messages
>>>>>>       *   for(line <- pricesRDD.collect.toArray)*
>>>>>>          {
>>>>>>            var key = line._2.split(',').view(0).toString
>>>>>>            var ticker =  line._2.split(',').view(1).toString
>>>>>>            var timeissued = line._2.split(',').view(2).toString
>>>>>>            var price = line._2.split(',').view(3).toFloat
>>>>>>            val priceToString = line._2.split(',').view(3)
>>>>>>             if (price > 90.0)
>>>>>>            {
>>>>>>                //save to Hbase table
>>>>>>            }
>>>>>>           }
>>>>>>      }
>>>>>>
>>>>>> That works fine.
>>>>>>
>>>>>> In Flink I define my source as below
>>>>>>
>>>>>>     val streamExecEnv =
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>
>>>>>> streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>>>     val stream = streamExecEnv
>>>>>>        .addSource(new FlinkKafkaConsumer011[String](topicsValue, new
>>>>>> SimpleStringSchema(), properties))
>>>>>>
>>>>>> Is there anyway I can perform similar operation in Flink? I need to
>>>>>> go through every topic load sent and look at the individual rows/ For
>>>>>> example what is the equivalent of
>>>>>>
>>>>>> *for(line <- pricesRDD.collect.toArray)*
>>>>>> In flink?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> Dr Mich Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>
>>>>>>
>>>>>>
>>>>>> http://talebzadehmich.wordpress.com
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>

Re: Working out through individual messages in Flink

Posted by Jörn Franke <jo...@gmail.com>.
Hi Mich,

Would it be possible to share the full source code ?
I am missing a call to streamExecEnvironment.execute

Best regards 

> On 8. Aug 2018, at 00:02, Mich Talebzadeh <mi...@gmail.com> wrote:
> 
> Hi Fabian,
> 
> Reading your notes above I have converted the table back to DataStream.
> 
>     val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>     tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker, 'timeissued, 'price)
> 
>            val key = tableEnv.scan("priceTable").select('key).toDataStream[Row]
>            val ticker = tableEnv.scan("priceTable").select('ticker).toDataStream[Row]
>            val timeissued = tableEnv.scan("priceTable").select('timeissued).toDataStream[Row]
>            val price = tableEnv.scan("priceTable").select('price).toDataStream[Row]
> 
> My intension is to create an Hbase sink as follows:
> 
>             // Save prices to Hbase table
>              var p = new Put(new String(key).getBytes())
>              p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(),          new String(ticker).getBytes())
>              p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(),     new String(timeissued).getBytes())
>              p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(),           new String(priceToString).getBytes())
>              p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(),         new String(CURRENCY).getBytes())
>              p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(),         new String(1.toString).getBytes())
>              p.add("OPERATION".getBytes(), "OP_TIME".getBytes(),         new String(System.currentTimeMillis.toString).getBytes())
>              HbaseTable.put(p)
>              HbaseTable.flushCommits()
> 
> However, I don't seem to be able to get the correct values for the columns!
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>  
> 
> 
>> On Mon, 30 Jul 2018 at 09:58, Fabian Hueske <fh...@gmail.com> wrote:
>> A *Table*Source [1], is a special input connector for Flink's relational APIs (Table API and SQL) [2].
>> You can transform and filter with these APIs as well (it's probably even easier). In SQL this would be the SELECT and WHERE clauses of a query.
>> 
>> However, there is no *Table*Sink for HBase and you would need to convert the Table back to a DataStream [3].
>> That's not very difficult since the APIs are integrated with each other.
>> 
>> Best, Fabian
>> 
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html
>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html
>> [3] https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
>> 
>> 2018-07-30 10:47 GMT+02:00 Mich Talebzadeh <mi...@gmail.com>:
>>> Thanks Fabian. That was very useful.
>>> 
>>> How about an operation like below?
>>> 
>>>          // create builder
>>>          val KafkaTableSource = Kafka011JsonTableSource.builder()
>>>          // set Kafka topic
>>>             .forTopic(topicsValue)
>>>          // set Kafka consumer properties
>>>             .withKafkaProperties(properties)
>>>          // set Table schema
>>>         .withSchema(TableSchema.builder()
>>>         .field("key", Types.STRING)
>>>         .field("ticker", Types.STRING)
>>>         .field("timeissued", Types.STRING)
>>>         .field("price", Types.FLOAT)
>>>         .build())
>>> 
>>> Will that be OK? 
>>> 
>>> 
>>> Dr Mich Talebzadeh
>>>  
>>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>  
>>> http://talebzadehmich.wordpress.com
>>> 
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>>>  
>>> 
>>> 
>>>> On Mon, 30 Jul 2018 at 09:19, Fabian Hueske <fh...@gmail.com> wrote:
>>>> Hi,
>>>> 
>>>> Flink processes streams record by record, instead of micro-batching records together. Since every record comes by itself, there is no for-each.
>>>> Simple record-by-record transformations can be done with a MapFunction, filtering out records with a FilterFunction. You can also implement a FlatMapFunction to do both in one step.
>>>> 
>>>> Once the stream is transformed and filtered, you can write it to HBase with a sink function.
>>>> 
>>>> 
>>>> 2018-07-30 10:03 GMT+02:00 Mich Talebzadeh <mi...@gmail.com>:
>>>>> Just to clarify these are the individual prices separated by ','. The below shows three price lines in the topic
>>>>> 
>>>>> UUID,                            Security,         Time,        Price
>>>>> 1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88
>>>>> 8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94
>>>>> 81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33
>>>>> 
>>>>> 
>>>>> Dr Mich Talebzadeh
>>>>>  
>>>>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>  
>>>>> http://talebzadehmich.wordpress.com
>>>>> 
>>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>>>>>  
>>>>> 
>>>>> 
>>>>>> On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh <mi...@gmail.com> wrote:
>>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> I have a Kafka topic that transmits 100 security prices ever 2 seconds. 
>>>>>> 
>>>>>> In Spark streaming I go through the RDD and walk through rows one by one and check prices
>>>>>> In prices are valuable I store them into an Hbase table
>>>>>> 
>>>>>>     val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
>>>>>>     dstream.cache()
>>>>>>     dstream.foreachRDD
>>>>>>     { pricesRDD =>
>>>>>>           // Work on individual messages
>>>>>>          for(line <- pricesRDD.collect.toArray)
>>>>>>          {
>>>>>>            var key = line._2.split(',').view(0).toString
>>>>>>            var ticker =  line._2.split(',').view(1).toString
>>>>>>            var timeissued = line._2.split(',').view(2).toString
>>>>>>            var price = line._2.split(',').view(3).toFloat
>>>>>>            val priceToString = line._2.split(',').view(3)
>>>>>>             if (price > 90.0)
>>>>>>            {
>>>>>>                //save to Hbase table
>>>>>>            }
>>>>>>           }
>>>>>>      }
>>>>>> 
>>>>>> That works fine. 
>>>>>> 
>>>>>> In Flink I define my source as below
>>>>>> 
>>>>>>     val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>     streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>>>     val stream = streamExecEnv
>>>>>>        .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties))
>>>>>> 
>>>>>> Is there anyway I can perform similar operation in Flink? I need to go through every topic load sent and look at the individual rows/ For example what is the equivalent of 
>>>>>> 
>>>>>> for(line <- pricesRDD.collect.toArray)
>>>>>> In flink?
>>>>>> 
>>>>>> Thanks
>>>>>> Dr Mich Talebzadeh
>>>>>>  
>>>>>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>  
>>>>>> http://talebzadehmich.wordpress.com
>>>>>> 
>>>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>>>>>>  
>>>> 
>>