You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Mich Talebzadeh <mi...@gmail.com> on 2018/08/07 16:07:58 UTC

Passing the individual table coilumn values to the local variables

Hi,

The following works fine

   tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker,
'timeissued, 'price)
    val result = tableEnv.scan("priceTable").filter('ticker === "VOD" &&
'price > 99.0).select('key, 'ticker, 'timeissued, 'price)
    val r = result.toDataStream[Row]
    r.print()

Now I would like to get the individual column values from priceTable into
local variables

This does not seem to work

val key = tableEnv.scan("priceTable").select('key).toDataStream[Row]
val ticker = tableEnv.scan("priceTable").select('ticker).toDataStream[Row]
 val timeissued =
tableEnv.scan("priceTable").select('timeissued).toDataStream[Row]
 val price = tableEnv.scan("priceTable").select('price).toDataStream[Row]

What alternatives are there?

Thanks

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Re: Passing the individual table coilumn values to the local variables

Posted by vino yang <ya...@gmail.com>.
Hi Mich,

Here you need to understand that the print call does not print the value of
a field, it is actually a call to an output to STDOUT sink.
So, what you get here is not the value of a variable, please refer to the
hequn recommendation.

Thanks, vino.

Hequn Cheng <ch...@gmail.com> 于2018年8月8日周三 上午9:11写道:

> Hi Mich,
>
> We can't convert a DataStream to a value. There are some options:
> 1. Use a TableSink to write data[1] into Hbase.
> 2. Use a UDF[2].
>
> Best, Hequn
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#user-defined-functions
>
> On Wed, Aug 8, 2018 at 2:22 AM, Mich Talebzadeh <mich.talebzadeh@gmail.com
> > wrote:
>
>> I need this operation to stored filtered rows in an Hbase table.
>>
>> I can access an existing Hbase table through flink API
>>
>> My challenge is to put rows into Hbase table. Something like below and I
>> don't seem to be able to extract individual column values from priceTable
>>
>>
>>
>>
>> *                    val key =
>> tableEnv.scan("priceTable").select('key).toDataStream[Row].print()
>> val ticker =
>> tableEnv.scan("priceTable").select('ticker).toDataStream[Row].print()
>> val timeissued =
>> tableEnv.scan("priceTable").select('timeissued).toDataStream[Row].print()
>> val price =
>> tableEnv.scan("priceTable").select('price).toDataStream[Row].print()*
>>            val CURRENCY = "GBP"
>>            val op_type = "1"
>>            val op_time = System.currentTimeMillis.toString
>> /*
>>            if (price > 99.0)
>>            {
>>             // Save prices to Hbase table
>>              var p = new Put(new String(key).getBytes())
>>              p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(),
>> new String(ticker).getBytes())
>>              p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(),     new
>> String(timeissued).getBytes())
>>              p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(),
>> new String(priceToString).getBytes())
>>              p.add("PRICE_INFO".getBytes(),
>> "CURRENCY".getBytes(),         new String(CURRENCY).getBytes())
>>              p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(),
>> new String(1.toString).getBytes())
>>              p.add("OPERATION".getBytes(), "OP_TIME".getBytes(),
>> new String(System.currentTimeMillis.toString).getBytes())
>>              HbaseTable.put(p)
>>              HbaseTable.flushCommits()
>>              if(tableEnv.scan("priceTable").filter('ticker == "VOD" &&
>> 'price > 99.0))
>>              {
>>                sqltext = Calendar.getInstance.getTime.toString + ", Price
>> on "+ticker+" hit " +price.toString
>>                //java.awt.Toolkit.getDefaultToolkit().beep()
>>                println(sqltext)
>>              }
>>            }
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Tue, 7 Aug 2018 at 17:07, Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> The following works fine
>>>
>>>    tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker,
>>> 'timeissued, 'price)
>>>     val result = tableEnv.scan("priceTable").filter('ticker === "VOD" &&
>>> 'price > 99.0).select('key, 'ticker, 'timeissued, 'price)
>>>     val r = result.toDataStream[Row]
>>>     r.print()
>>>
>>> Now I would like to get the individual column values from priceTable
>>> into local variables
>>>
>>> This does not seem to work
>>>
>>> val key = tableEnv.scan("priceTable").select('key).toDataStream[Row]
>>> val ticker =
>>> tableEnv.scan("priceTable").select('ticker).toDataStream[Row]
>>>  val timeissued =
>>> tableEnv.scan("priceTable").select('timeissued).toDataStream[Row]
>>>  val price = tableEnv.scan("priceTable").select('price).toDataStream[Row]
>>>
>>> What alternatives are there?
>>>
>>> Thanks
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>
>

Re: Passing the individual table coilumn values to the local variables

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Mich,

We can't convert a DataStream to a value. There are some options:
1. Use a TableSink to write data[1] into Hbase.
2. Use a UDF[2].

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#user-defined-functions

On Wed, Aug 8, 2018 at 2:22 AM, Mich Talebzadeh <mi...@gmail.com>
wrote:

> I need this operation to stored filtered rows in an Hbase table.
>
> I can access an existing Hbase table through flink API
>
> My challenge is to put rows into Hbase table. Something like below and I
> don't seem to be able to extract individual column values from priceTable
>
>
>
>
> *                    val key =
> tableEnv.scan("priceTable").select('key).toDataStream[Row].print()
> val ticker =
> tableEnv.scan("priceTable").select('ticker).toDataStream[Row].print()
> val timeissued =
> tableEnv.scan("priceTable").select('timeissued).toDataStream[Row].print()
> val price =
> tableEnv.scan("priceTable").select('price).toDataStream[Row].print()*
>            val CURRENCY = "GBP"
>            val op_type = "1"
>            val op_time = System.currentTimeMillis.toString
> /*
>            if (price > 99.0)
>            {
>             // Save prices to Hbase table
>              var p = new Put(new String(key).getBytes())
>              p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(),
> new String(ticker).getBytes())
>              p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(),     new
> String(timeissued).getBytes())
>              p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(),
> new String(priceToString).getBytes())
>              p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(),
> new String(CURRENCY).getBytes())
>              p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(),
> new String(1.toString).getBytes())
>              p.add("OPERATION".getBytes(), "OP_TIME".getBytes(),
> new String(System.currentTimeMillis.toString).getBytes())
>              HbaseTable.put(p)
>              HbaseTable.flushCommits()
>              if(tableEnv.scan("priceTable").filter('ticker == "VOD" &&
> 'price > 99.0))
>              {
>                sqltext = Calendar.getInstance.getTime.toString + ", Price
> on "+ticker+" hit " +price.toString
>                //java.awt.Toolkit.getDefaultToolkit().beep()
>                println(sqltext)
>              }
>            }
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 7 Aug 2018 at 17:07, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> Hi,
>>
>> The following works fine
>>
>>    tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker,
>> 'timeissued, 'price)
>>     val result = tableEnv.scan("priceTable").filter('ticker === "VOD" &&
>> 'price > 99.0).select('key, 'ticker, 'timeissued, 'price)
>>     val r = result.toDataStream[Row]
>>     r.print()
>>
>> Now I would like to get the individual column values from priceTable into
>> local variables
>>
>> This does not seem to work
>>
>> val key = tableEnv.scan("priceTable").select('key).toDataStream[Row]
>> val ticker = tableEnv.scan("priceTable").select('ticker).toDataStream[
>> Row]
>>  val timeissued = tableEnv.scan("priceTable").select('timeissued).
>> toDataStream[Row]
>>  val price = tableEnv.scan("priceTable").select('price).toDataStream[Row]
>>
>> What alternatives are there?
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>

Re: Passing the individual table coilumn values to the local variables

Posted by Mich Talebzadeh <mi...@gmail.com>.
I need this operation to stored filtered rows in an Hbase table.

I can access an existing Hbase table through flink API

My challenge is to put rows into Hbase table. Something like below and I
don't seem to be able to extract individual column values from priceTable




*                    val key =
tableEnv.scan("priceTable").select('key).toDataStream[Row].print()
val ticker =
tableEnv.scan("priceTable").select('ticker).toDataStream[Row].print()
val timeissued =
tableEnv.scan("priceTable").select('timeissued).toDataStream[Row].print()
val price =
tableEnv.scan("priceTable").select('price).toDataStream[Row].print()*
           val CURRENCY = "GBP"
           val op_type = "1"
           val op_time = System.currentTimeMillis.toString
/*
           if (price > 99.0)
           {
            // Save prices to Hbase table
             var p = new Put(new String(key).getBytes())
             p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(),
new String(ticker).getBytes())
             p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(),     new
String(timeissued).getBytes())
             p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(),
new String(priceToString).getBytes())
             p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(),
new String(CURRENCY).getBytes())
             p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(),
new String(1.toString).getBytes())
             p.add("OPERATION".getBytes(), "OP_TIME".getBytes(),
new String(System.currentTimeMillis.toString).getBytes())
             HbaseTable.put(p)
             HbaseTable.flushCommits()
             if(tableEnv.scan("priceTable").filter('ticker == "VOD" &&
'price > 99.0))
             {
               sqltext = Calendar.getInstance.getTime.toString + ", Price
on "+ticker+" hit " +price.toString
               //java.awt.Toolkit.getDefaultToolkit().beep()
               println(sqltext)
             }
           }

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 7 Aug 2018 at 17:07, Mich Talebzadeh <mi...@gmail.com>
wrote:

> Hi,
>
> The following works fine
>
>    tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker,
> 'timeissued, 'price)
>     val result = tableEnv.scan("priceTable").filter('ticker === "VOD" &&
> 'price > 99.0).select('key, 'ticker, 'timeissued, 'price)
>     val r = result.toDataStream[Row]
>     r.print()
>
> Now I would like to get the individual column values from priceTable into
> local variables
>
> This does not seem to work
>
> val key = tableEnv.scan("priceTable").select('key).toDataStream[Row]
> val ticker = tableEnv.scan("priceTable").select('ticker).toDataStream[Row]
>  val timeissued =
> tableEnv.scan("priceTable").select('timeissued).toDataStream[Row]
>  val price = tableEnv.scan("priceTable").select('price).toDataStream[Row]
>
> What alternatives are there?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>